Coverage for CIResults/run_import.py: 88%
597 statements
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 08:12 +0000
« prev ^ index » next coverage.py v7.10.2, created at 2025-08-06 08:12 +0000
1from __future__ import annotations
3import configparser
4import copy
5import datetime
6import logging
7import os
8import sys
9import time
10from collections import defaultdict, namedtuple
11from collections.abc import Iterable
12from dataclasses import InitVar, dataclass, field
13from pathlib import Path
14from typing import Any, Callable, cast
16import pytz
17from django.core.exceptions import ValidationError
18from django.core.validators import URLValidator
19from django.db import transaction
20from django.db.models import Model, prefetch_related_objects
21from django.db.models.query import QuerySet
22from django.utils import timezone
23from django.utils.functional import cached_property
25from CIResults.models import (
26 Build,
27 Component,
28 Issue,
29 IssueFilter,
30 IssueFilterAssociated,
31 KnownFailure,
32 Machine,
33 RunConfig,
34 RunConfigTag,
35 RunFilterStatistic,
36 Test,
37 TestResult,
38 TestSuite,
39 TestsuiteRun,
40 TextStatus,
41 UnknownFailure,
42)
44current_dir = os.path.dirname(os.path.realpath(__file__))
45sys.path.append(os.path.join(current_dir, "piglit"))
46from framework import backends as piglit_backends, results as piglit_results # noqa
48# Set up logger
49logger = logging.getLogger("run_import")
50logger.setLevel("INFO")
51stream_handler = logging.StreamHandler(sys.stdout)
52stream_handler.setFormatter(
53 logging.Formatter(
54 "[{asctime}] [{levelname}:{name}] {message}",
55 style="{",
56 datefmt="%Y-%m-%d %H:%M",
57 )
58)
59logger.addHandler(stream_handler)
62def str_to_list(string: str | None, separator: str = " ") -> list:
63 if string is None:
64 return []
65 return [part.strip() for part in string.split(separator) if len(part) > 0]
68url_validator = URLValidator()
71def validate_url(url: str | None) -> str | None:
72 if url is None:
73 return None
74 try:
75 url_validator(url)
76 return url
77 except ValidationError:
78 return None
81def get_relative_path_depth(path: Path, other_path: Path) -> int:
82 return abs(len(path.parts) - len(other_path.parts))
85FilterStatsTuple = namedtuple("FilterStatsTuple", ("current", "new"))
88def none_if_undefined(func: Callable[..., Any | None]) -> Callable[..., Any | None]:
89 """Helper decorator for safe retrieving class attributes, if it is not certain, that they've been defined, yet."""
91 def wrapper(*args, **kwargs) -> Any | None:
92 try:
93 return func(*args, **kwargs)
94 except AttributeError:
95 return None
97 return wrapper
100@dataclass(frozen=True)
101class TestsuiteTestResult:
102 name: str
103 status: str
104 start_time: datetime.datetime
105 duration: datetime.timedelta
106 command: str | None = None
107 stdout: str | None = None
108 stderr: str | None = None
109 dmesg: str | None = None
110 url: str | None = None
113@dataclass(frozen=True)
114class TestsuiteRunResults:
115 testsuite: TestsuiteResults
116 machine_name: str
117 run_id: int
118 test_results: list[TestsuiteTestResult]
119 start_time: datetime.datetime
120 duration: datetime.timedelta
122 @staticmethod
123 def get_results_url(testsuite: TestsuiteResults, run_id: int, machine_name: str, test_name: str) -> str | None:
124 """Generates and returns the testresult's external URL"""
125 url: str = testsuite.result_url_pattern.format(
126 runconfig=testsuite.runconfig.name,
127 testsuite_build=testsuite.build,
128 run_id=run_id,
129 test=test_name,
130 machine=machine_name,
131 )
132 return validate_url(url)
134 @cached_property
135 def tests_set(self) -> set[str]:
136 return {test_result.name for test_result in self.test_results}
138 @cached_property
139 def statuses_set(self) -> set[str]:
140 return {test_result.status for test_result in self.test_results}
143class PiglitResult(TestsuiteRunResults):
144 def __init__(self, testsuite: TestsuiteResults, machine_name: str, run_id: int, dir_name: str) -> None:
145 test_results: list[TestsuiteTestResult] = []
146 try:
147 results: piglit_results.TestrunResult = piglit_backends.load(dir_name)
148 test_duration_sum: datetime.timedelta = datetime.timedelta()
149 test_name: str
150 test: piglit_results.TestResult
151 for test_name, test in results.tests.items():
152 url: str | None = self.get_results_url(testsuite, run_id, machine_name, test_name)
153 start_time: datetime.datetime = datetime.datetime.fromtimestamp(test.time.start, tz=pytz.utc)
154 duration: datetime.timedelta = datetime.timedelta(seconds=test.time.total)
155 test_duration_sum += duration
156 test_results.append(
157 TestsuiteTestResult(
158 name=test_name,
159 status=test.result,
160 start_time=start_time,
161 duration=duration,
162 command=test.command,
163 stdout=str(test.out),
164 stderr=str(test.err),
165 dmesg=test.dmesg,
166 url=url,
167 )
168 )
169 start: datetime.datetime = datetime.datetime.fromtimestamp(results.time_elapsed.start, tz=pytz.utc)
170 duration = datetime.timedelta(seconds=results.time_elapsed.total)
171 # Make sure the total duration is at least as long as the sum of all the test executions
172 duration = max(duration, test_duration_sum)
173 except pytz.exceptions.Error:
174 start = datetime.datetime.fromtimestamp(0, tz=pytz.utc)
175 duration = datetime.timedelta()
176 super().__init__(testsuite, machine_name, run_id, test_results, start, duration)
179class JsonResult(TestsuiteRunResults):
180 def __init__(
181 self, testsuite: TestsuiteResults, machine_name: str, run_id: int, test_results: list[TestsuiteTestResult]
182 ) -> None:
183 test_run_duration_sum: datetime.timedelta = datetime.timedelta()
184 for test_result in test_results:
185 test_run_duration_sum += test_result.duration
186 start: datetime.datetime = datetime.datetime.fromtimestamp(0, tz=pytz.utc)
187 super().__init__(testsuite, machine_name, run_id, test_results, start, test_run_duration_sum)
190@dataclass
191class TestsuiteResults:
192 runconfig: RunConfigResults
193 name: str
194 build: str
195 result_url_pattern: str
196 format: str
197 format_version: int | None = None
198 db_object: TestSuite = field(init=False)
199 _result_type: type[TestsuiteRunResults] = field(init=False)
201 def __post_init__(self) -> None:
202 # Check if the database contains the build, and then fetch the Testsuite associated with it
203 db_build = Build.objects.get(name=self.build)
204 self.db_object = TestSuite.objects.get(name=db_build.component)
205 self._set_result_type()
207 def _set_result_type(self) -> None:
208 match self.format:
209 case "piglit":
210 if self.format_version != 1:
211 raise ValueError(
212 f"The version {self.format_version} of the testsuite result format '{self.format}' "
213 "is unsupported"
214 )
215 self._result_type = PiglitResult
216 case "json":
217 self._result_type = JsonResult
218 case _:
219 raise ValueError(f"The testsuite result format '{self.format}' is unsupported")
221 def read_results(self, *args, **kwargs) -> TestsuiteRunResults:
222 if self._result_type not in (PiglitResult, JsonResult):
223 raise ValueError(f"This operation is unsupported for the data format '{self.format}'")
224 return self._result_type(self, *args, **kwargs)
227@dataclass
228class TestSuiteRunDef:
229 testsuite_build: str
230 results_format: str
231 results_format_version_raw: InitVar[int | str]
232 machine: str
233 testsuite_run_id_raw: InitVar[int | str]
234 testsuite_run_path: str
235 results_format_version: int = field(init=False)
236 testsuite_run_id: int = field(init=False)
238 def __post_init__(self, results_format_version_raw: int | str, testsuite_run_id_raw: int | str) -> None:
239 self.results_format_version = self._to_int("results_format_version_raw", results_format_version_raw)
240 self.testsuite_run_id = self._to_int("testsuite_run_id_raw", testsuite_run_id_raw)
241 for cls_field in ("testsuite_build", "results_format", "machine", "testsuite_run_id", "testsuite_run_path"):
242 if getattr(self, cls_field) is None:
243 raise ValueError(f"The parameter {cls_field} cannot be None")
245 @staticmethod
246 def _to_int(field_name: str, value: int | str) -> int:
247 try:
248 return int(value)
249 except Exception:
250 raise ValueError(f"The parameter {field_name} '{value}' should be an integer")
253@dataclass
254class RunConfigResults:
255 name: str
256 url: str | None = None
257 result_url_pattern: str = ""
258 environment: str | None = None
259 builds: list[str] = field(default_factory=list)
260 tags: list[str] = field(default_factory=list)
261 temporary: bool = False
262 run_results: list[TestsuiteRunResults] = field(default_factory=list)
263 testsuites: dict[str, TestsuiteResults] = field(default_factory=dict, init=False)
265 def _import_results(self, *args, **kwargs) -> None:
266 raise NotImplementedError
268 @cached_property
269 def tests(self) -> defaultdict[TestSuite, set[str]]:
270 tests: defaultdict[TestSuite, set[str]] = defaultdict(set)
271 for run in self.run_results:
272 tests[run.testsuite.db_object] |= run.tests_set
273 return tests
275 @cached_property
276 def machines(self) -> set[str]:
277 return {run.machine_name for run in self.run_results}
279 @cached_property
280 def text_statuses(self) -> defaultdict[TestSuite, set[str]]:
281 statuses: defaultdict[TestSuite, set[str]] = defaultdict(set)
282 for run in self.run_results:
283 statuses[run.testsuite.db_object] |= run.statuses_set
284 return statuses
287class RunConfigResultsFromDir(RunConfigResults):
288 _RUNCONFIG_SECTION_HEADER: str = "CIRESULTS_RUNCONFIG"
290 def __init__(self, runconfig_dir: str) -> None:
291 runconfig_path = os.path.join(runconfig_dir, "runconfig.ini")
292 self._error_prefix = f"The RunConfig file {runconfig_path} is invalid: "
294 runconfig_parser: configparser.ConfigParser = self._get_runconfig_parser(runconfig_path)
295 main_section_proxy: configparser.SectionProxy = runconfig_parser[self._RUNCONFIG_SECTION_HEADER]
296 name = main_section_proxy.get("name")
297 if name is None:
298 self._raise_parser_error("runconfig name unspecified")
299 super().__init__(
300 name=cast(str, name),
301 url=validate_url(main_section_proxy.get("url")),
302 result_url_pattern=main_section_proxy.get("result_url_pattern", ""),
303 environment=main_section_proxy.get("environment"),
304 builds=str_to_list(main_section_proxy.get("builds")),
305 tags=str_to_list(main_section_proxy.get("tags")),
306 temporary=main_section_proxy.getboolean("temporary", False),
307 )
308 # Parse the testsuite sections
309 for section_name in runconfig_parser.sections():
310 # Ignore the main section
311 if section_name == self._RUNCONFIG_SECTION_HEADER:
312 continue
313 build: str = runconfig_parser[section_name]["build"]
314 if build not in self.builds:
315 self._raise_parser_error(
316 f"The build '{build}' of the testsuite '{section_name}' is not found "
317 f"in the list of builds of the runconfig {self.name}"
318 )
319 format: str = runconfig_parser[section_name]["format"]
320 version: int = runconfig_parser[section_name].getint("version", 1)
321 result_url_pattern: str = runconfig_parser[section_name].get("result_url_pattern", "")
322 self.testsuites[section_name] = TestsuiteResults(
323 self, section_name, build, result_url_pattern, format, version
324 )
325 self._import_results(runconfig_dir)
327 def _import_results(self, runconfig_dir: str) -> None:
328 runconfig_dir_path: Path = Path(runconfig_dir)
329 testsuite_results: TestsuiteResults | None = None
330 for dirpath, dirnames, _ in runconfig_dir_path.walk():
331 relative_path_depth = get_relative_path_depth(runconfig_dir_path, dirpath)
332 match relative_path_depth:
333 case 1: # Testsuite results directory level
334 if (testsuite_results := self.testsuites.get(dirpath.stem)) is None:
335 dirnames.clear() # Clear directories list = stop Path.walk() from going deeper
336 logger.info(
337 "Ignore the testsuite '%s' because it is not listed in the runconfig file",
338 dirpath.stem,
339 )
340 continue
341 case 2: # Machine directory level
342 machine: str = dirpath.stem
343 continue
344 case 3: # Run directory level
345 dirnames.clear() # Clear directories list = stop Path.walk() from going deeper
346 case _:
347 continue
348 try:
349 testsuite_run_id: int = int(dirpath.stem)
350 except ValueError:
351 logger.warning("RunConfigResults: testsuite run ID '%s' should be an integer", dirpath.stem)
352 continue
353 testsuite_results = cast(TestsuiteResults, testsuite_results)
354 try:
355 self.run_results.append(testsuite_results.read_results(machine, testsuite_run_id, str(dirpath)))
356 except FileNotFoundError:
357 pass
358 except Exception:
359 logger.exception("")
361 def _get_runconfig_parser(self, runconfig_path: str) -> configparser.ConfigParser:
362 runconfig_parser: configparser.ConfigParser = configparser.ConfigParser()
363 runconfig_parser.read(runconfig_path)
364 if not runconfig_parser.has_section(self._RUNCONFIG_SECTION_HEADER):
365 self._raise_parser_error(f"missing the section {self._RUNCONFIG_SECTION_HEADER}")
366 return runconfig_parser
368 def _raise_parser_error(self, msg: str) -> None:
369 raise ValueError(self._error_prefix + msg)
372class RunConfigResultsFromArgs(RunConfigResults):
373 def __init__(
374 self,
375 name: str,
376 url: str | None = None,
377 result_url_pattern: str = "",
378 environment: str | None = None,
379 builds: list[str] | None = None,
380 tags: list[str] | None = None,
381 temporary: bool = False,
382 results: list[TestSuiteRunDef] | None = None,
383 ) -> None:
384 super().__init__(
385 name=name,
386 url=url,
387 result_url_pattern=result_url_pattern,
388 environment=environment,
389 builds=builds or [],
390 tags=tags or [],
391 temporary=temporary,
392 )
393 self._import_results(results or [])
395 def _import_results(self, results: list[TestSuiteRunDef]) -> None:
396 self._args_testsuites: dict[str, TestsuiteResults] = {}
397 testsuite_runs: set[tuple[str, int, str, str]] = set()
398 for result in results:
399 # Check if the testsuite build is in the list of builds of the runconfig
400 if result.testsuite_build not in self.builds:
401 raise ValueError(
402 f"The build named '{result.testsuite_build}' is not part of the list of builds of the runconfig"
403 )
404 try:
405 testsuite_result: TestsuiteResults = self._get_or_create_testsuite_results(result)
406 self._args_testsuites[result.testsuite_build] = testsuite_result
407 except Build.DoesNotExist:
408 raise ValueError(f"The build named '{result.testsuite_build}' does not exist")
409 # Check if the testsuite run has not been added already
410 testsuite_run_key_values: tuple[str, int, str, str] = (
411 testsuite_result.name,
412 result.testsuite_run_id,
413 self.name,
414 result.machine,
415 )
416 if testsuite_run_key_values in testsuite_runs:
417 raise ValueError(
418 f"Try to import twice {testsuite_result.name}'s run ID {result.testsuite_run_id} "
419 f"on the runconfig '{self.name}' for the machine '{result.machine}'"
420 )
421 testsuite_runs.add(testsuite_run_key_values)
422 # Import the results
423 try:
424 self.run_results.append(
425 testsuite_result.read_results(result.machine, result.testsuite_run_id, result.testsuite_run_path)
426 )
427 except FileNotFoundError:
428 pass
429 except Exception:
430 logger.exception("")
432 def _get_or_create_testsuite_results(self, result: TestSuiteRunDef) -> TestsuiteResults:
433 """Get the testsuite associated to the build name, or create it"""
434 testsuite_result: TestsuiteResults | None = self._args_testsuites.get(result.testsuite_build)
435 if testsuite_result is None:
436 testsuite_name = Build.objects.get(name=result.testsuite_build).component.name
437 # Create the testsuite results object
438 testsuite_result = TestsuiteResults(
439 self,
440 testsuite_name,
441 result.testsuite_build,
442 self.result_url_pattern,
443 result.results_format,
444 result.results_format_version,
445 )
446 return testsuite_result
449class ResultsCommitHandler:
450 def __init__(self, runconfig_results: RunConfigResults):
451 self._runconfig_results: RunConfigResults = runconfig_results
453 @transaction.atomic
454 def commit(
455 self,
456 new_machines_public: bool = False,
457 new_tests_public: bool = False,
458 new_machines_vetted: bool = False,
459 new_tests_vetted: bool = False,
460 new_statuses_vetted: bool = False,
461 ) -> None:
462 self._now = timezone.now()
463 self._now = cast(datetime.datetime, self._now)
464 self._fetch_builds_and_tags()
465 self._fetch_or_create_runconfig()
466 self._verify_duplicate_builds()
467 self._add_new_builds()
468 # Abort early if there is nothing else to do
469 if len(self._runconfig_results.run_results) == 0:
470 logger.info("No results to add, exiting...")
471 return
472 self._fetch_existing_testsuite_runs()
473 self._create_missing_machines(new_machines_public, new_machines_vetted)
474 self._create_missing_tests(new_tests_public, new_tests_vetted)
475 self._create_missing_statuses(new_statuses_vetted)
476 self._create_testsuite_runs()
477 self._create_test_results_and_find_failures()
478 self._fetch_issue_filter_associateds()
479 self._map_failures()
480 self._create_new_failures()
481 # Create the statistics objects, but only if the run was not temporary.
482 if not self._runconfig_results.temporary:
483 self._update_statistics_and_corresponding_issues()
484 if len(self._unknown_failures) > 0:
485 self._fetch_archived_issue_filter_associateds()
486 self._match_unknown_failures_to_archived_ifa()
488 def _fetch_builds_and_tags(self) -> None:
489 self._builds = self._fetch_object_by_names(Build, self._runconfig_results.builds)
490 self._tags = self._fetch_object_by_names(RunConfigTag, self._runconfig_results.tags)
492 def _fetch_or_create_runconfig(self) -> None:
493 self._runconfig, created = RunConfig.objects.get_or_create(
494 name=self._runconfig_results.name,
495 defaults={
496 "url": self._runconfig_results.url,
497 "environment": self._runconfig_results.environment,
498 "temporary": self._runconfig_results.temporary,
499 },
500 )
501 self._runconfig_builds: set[Build]
502 if created:
503 # Add runconfig to all the tags
504 for tag in self._tags.values():
505 self._runconfig.tags.add(tag)
506 # Runconfig builds are empty because we've just created the runconfig
507 self._runconfig_builds = set()
508 return
509 self._runconfig_builds = set(self._runconfig.builds.all())
511 def _verify_duplicate_builds(self) -> None:
512 """Verify that we do not have two builds for the same component"""
513 components: dict[Component, Build] = {}
514 for build in set(self._builds.values()) | self._runconfig_builds:
515 if build.component not in components:
516 components[build.component] = build
517 else:
518 raise ValueError(
519 f"ERROR: Two builds ({build} and {components[build.component]}) "
520 f"cannot be from the same component ({build.component})"
521 )
523 def _add_new_builds(self) -> None:
524 for new_build in set(self._builds.values()) - self._runconfig_builds:
525 self._runconfig.builds.add(new_build)
527 def _fetch_existing_testsuite_runs(self) -> None:
528 """Load all the existing testsuite runs on this runconfig"""
529 # NOTE: prefetch data that is later used for cheking IssueFilter's coverage
530 testsuite_runs_query: QuerySet[TestsuiteRun] = TestsuiteRun.objects.filter(
531 runconfig=self._runconfig
532 ).prefetch_related("machine__tags", "runconfig__tags", "testsuite")
533 self._testsuite_runs: defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]] = (
534 self._testsuite_runs_to_dict(testsuite_runs_query)
535 )
537 def _create_missing_machines(self, public: bool, vetted: bool) -> None:
538 self._machines: dict[str, Machine] = self._create_missing(
539 model=Machine,
540 model_str="machine(s)",
541 missing_objs=self._runconfig_results.machines,
542 key_field="name",
543 args={"public": public, "vetted_on": self._now if vetted else None},
544 )
546 def _create_missing_tests(self, public: bool, vetted: bool) -> None:
547 self._tests: dict[str, dict[str, Test]] = {}
548 current_tests_ids: list[int] = []
549 for testsuite, tests in self._runconfig_results.tests.items():
550 self._tests[testsuite.name] = self._create_missing(
551 model=Test,
552 model_str=f"test(s) ({testsuite})",
553 missing_objs=tests,
554 key_field="name",
555 args={
556 "public": public,
557 "testsuite": testsuite,
558 "vetted_on": self._now if vetted else None,
559 },
560 filter={"testsuite": testsuite},
561 )
562 # If the runconfig is non-temporary, convert the tests
563 if not self._runconfig_results.temporary:
564 current_tests_ids.extend([test.id for test in self._tests[testsuite.name].values()])
565 if current_tests_ids:
566 Test.objects.filter(pk__in=current_tests_ids, first_runconfig=None).update(first_runconfig=self._runconfig)
568 def _create_missing_statuses(self, vetted: bool) -> None:
569 self._statuses: dict[str, dict[str, TextStatus]] = {}
570 for testsuite, text_status in self._runconfig_results.text_statuses.items():
571 self._statuses[testsuite.name] = self._create_missing(
572 model=TextStatus,
573 model_str=f"status(es) ({testsuite})",
574 missing_objs=text_status,
575 key_field="name",
576 args={"testsuite": testsuite, "vetted_on": self._now if vetted else None},
577 filter={"testsuite": testsuite},
578 )
580 def _create_testsuite_runs(self) -> None:
581 self._new_run_results: list[TestsuiteRunResults] = []
582 to_create: list[TestsuiteRun] = []
583 for run_result in self._runconfig_results.run_results:
584 # Ignore the runs we already have
585 if (
586 self._testsuite_runs.get(run_result.machine_name, {})
587 .get(run_result.run_id, {})
588 .get(run_result.testsuite.name)
589 ) is not None:
590 continue
591 # Create the TestsuiteRun object
592 new_testsuite_run: TestsuiteRun = TestsuiteRun(
593 testsuite=run_result.testsuite.db_object,
594 runconfig=self._runconfig,
595 machine=self._machines[run_result.machine_name],
596 run_id=run_result.run_id,
597 start=run_result.start_time,
598 duration=run_result.duration,
599 )
600 to_create.append(new_testsuite_run)
601 self._new_run_results.append(run_result)
602 if not (to_create_len := len(to_create)):
603 return
604 logger.info("Adding %s testsuite runs", to_create_len)
605 new_testsuite_runs: list[TestsuiteRun] = TestsuiteRun.objects.bulk_create(to_create)
606 # NOTE: prefetch data that is later used for cheking IssueFilter's coverage
607 prefetch_related_objects(new_testsuite_runs, "machine__tags", "runconfig__tags", "testsuite")
608 self._testsuite_runs |= self._testsuite_runs_to_dict(new_testsuite_runs)
610 def _create_test_results_and_find_failures(self) -> None:
611 self._test_results: list[TestResult] = []
612 self._failures: list[TestResult] = []
613 for run in self._new_run_results:
614 for result in run.test_results:
615 testsuite = run.testsuite.db_object
616 test_result = TestResult(
617 test=self._tests[testsuite.name][result.name],
618 ts_run=self._testsuite_runs[run.machine_name][run.run_id][run.testsuite.name],
619 status=self._statuses[testsuite.name][result.status],
620 start=result.start_time,
621 duration=result.duration,
622 command=result.command,
623 stdout=result.stdout,
624 stderr=result.stderr,
625 dmesg=result.dmesg,
626 url=result.url,
627 )
628 test_result.machine_name_alias = run.machine_name
629 test_result.run_id_alias = run.run_id
630 self._test_results.append(test_result)
631 if test_result.is_failure:
632 self._failures.append(test_result)
633 if test_results_len := len(self._test_results):
634 logger.info("Adding %s test results", test_results_len)
635 self._test_results = TestResult.objects.bulk_create(self._test_results, batch_size=5000)
637 def _fetch_issue_filter_associateds(self) -> None:
638 """Fetch all the associated IssueFilters and their related issues"""
639 self._ifas: defaultdict[IssueFilter, list[IssueFilterAssociated]] = defaultdict(list)
640 # NOTE: prefetch data that is later used for cheking IssueFilter's coverage
641 active_ifas: QuerySet[IssueFilterAssociated] = IssueFilterAssociated.objects_ready_for_matching.filter(
642 deleted_on=None
643 ).prefetch_related("filter__tests__testsuite", "filter__statuses__testsuite")
644 for ifa in active_ifas:
645 self._ifas[ifa.filter].append(ifa)
647 def _map_failures(self) -> None:
648 """
649 Get the filter statistics already-existing on this runconfig and lock them for update.
650 Next, update filter statistics for the issue filter that covers the result.
651 """
652 start: float = time.time()
653 self._known_failures: list[KnownFailure] = []
654 self._unknown_failures: list[UnknownFailure] = []
655 self._fetch_and_lock_existing_filter_statistics()
656 for result in self._test_results:
657 found: bool = False
658 for issue_filter in self._ifas:
659 # Start matching the result to the current filter
660 if issue_filter.covers(result):
661 # Get or create a statistics object for the current filter and runconfig
662 filter_stats: FilterStatsTuple = self._filters_stats_changes.get(
663 issue_filter, FilterStatsTuple(None, None)
664 )
665 if filter_stats.new is None:
666 if filter_stats.current is not None:
667 filter_stats = FilterStatsTuple(filter_stats.current, copy.copy(filter_stats.current))
668 self._filters_stats_changes[issue_filter] = filter_stats
669 else:
670 filter_stats = FilterStatsTuple(
671 None,
672 RunFilterStatistic(
673 filter=issue_filter, runconfig=self._runconfig, matched_count=0, covered_count=0
674 ),
675 )
676 self._filters_stats_changes[issue_filter] = filter_stats
677 filter_stats.new.covered_count += 1
678 if result not in self._failures:
679 continue
680 if issue_filter.matches(result):
681 filter_stats.new.matched_count += 1
682 for ifa in self._ifas[issue_filter]:
683 self._known_failures.append(KnownFailure(result=result, matched_ifa=ifa))
684 found = True
685 if result in self._failures and not found:
686 self._unknown_failures.append(UnknownFailure(result=result))
687 execution_time_ms: float = (time.time() - start) * 1000
688 logger.info(
689 "Found %s test failures (%s filters matched, %s failures left unmatched) in %.2f ms",
690 len(self._failures),
691 len(self._known_failures),
692 len(self._unknown_failures),
693 execution_time_ms,
694 )
696 def _fetch_and_lock_existing_filter_statistics(self) -> None:
697 """Get the filters statistics already-existing on this runconfig and lock them for update"""
698 # Stores results as a namedtuple (current, new)
699 self._filters_stats_changes: dict[IssueFilter, FilterStatsTuple] = {}
700 if not self._runconfig_results.temporary:
701 for stat in RunFilterStatistic.objects.select_for_update().filter(runconfig=self._runconfig):
702 self._filters_stats_changes[stat.filter] = FilterStatsTuple(stat, None)
704 def _create_new_failures(self) -> None:
705 self._known_failures = KnownFailure.objects.bulk_create(self._known_failures)
706 self._unknown_failures = UnknownFailure.objects.bulk_create(self._unknown_failures)
708 @transaction.atomic
709 def _update_statistics_and_corresponding_issues(self) -> None:
710 # Create a transaction savepoint, in case the updating process fails
711 try:
712 self._update_statistics()
713 self._update_issues()
714 except Exception:
715 logger.exception("")
717 def _update_statistics(self) -> None:
718 """
719 Save all the stats (WARNING: some might already exist in the DB, hence update_conflict=True in bulk_create)
720 """
721 self._statistics: list[RunFilterStatistic] = [
722 filter_stats.new for filter_stats in self._filters_stats_changes.values() if filter_stats.new is not None
723 ]
724 logger.info("Updating the statistics of %s filters", len(self._statistics))
725 start: float = time.time()
726 try:
727 RunFilterStatistic.objects.bulk_create(
728 self._statistics,
729 update_conflicts=True,
730 unique_fields=("runconfig", "filter"),
731 update_fields=("covered_count", "matched_count"),
732 )
733 except Exception:
734 logger.exception("")
735 execution_time_ms: float = (time.time() - start) * 1000
736 logger.info("Filter statistics updated in %.2f ms", execution_time_ms)
738 def _update_issues(self) -> None:
739 """Get the list of issues that need to be updated because the filters statistics got updated"""
740 # NOTE: prefetch data that is later used for updating statistics
741 self._issues: list[Issue] = list(
742 Issue.objects.select_for_update()
743 .filter(filters__in=self._filters_stats_changes.keys())
744 .prefetch_related("filters")
745 )
746 logger.info("Updating the statistics of %s issues", len(self._issues))
747 start: float = time.time()
748 for issue in self._issues:
749 self._issue_simple_stats_recomputing(issue, self._runconfig, self._filters_stats_changes)
750 execution_time_ms: float = (time.time() - start) * 1000
751 logger.info("Issue statistics updated in %.2f ms", execution_time_ms)
753 def _fetch_archived_issue_filter_associateds(self) -> None:
754 """Fetch all archived IFAs that are less than 6 months old"""
755 self._archived_ifas: defaultdict[IssueFilter, list[IssueFilterAssociated]] = defaultdict(list)
756 archived_threshold: datetime.datetime = self._now - datetime.timedelta(days=180)
757 # NOTE: prefetch data that is later used for cheking IssueFilter's matching
758 self._db_archived_ifas: QuerySet[IssueFilterAssociated] = (
759 IssueFilterAssociated.objects_ready_for_matching.exclude(
760 deleted_on=None, deleted_on__lt=archived_threshold
761 ).prefetch_related("filter__tests__testsuite", "filter__statuses__testsuite")
762 )
763 for archived_ifa in self._db_archived_ifas:
764 self._archived_ifas[archived_ifa.filter].append(archived_ifa)
766 def _match_unknown_failures_to_archived_ifa(self) -> None:
767 """Try to match the unknown failures with archived IFAs"""
768 start: float = time.time()
769 filters_matching: set[IssueFilter] = set()
770 for failure in self._unknown_failures:
771 for issue_filter, ifas in self._archived_ifas.items():
772 if issue_filter.matches(failure.result):
773 failure.matched_archived_ifas.add(*ifas)
774 filters_matching.add(issue_filter)
775 execution_time_ms: float = (time.time() - start) * 1000
776 logger.info(
777 "Found %s/%s recently-archived filters matching some unknown failures in %.2f ms",
778 len(filters_matching),
779 len(self._db_archived_ifas),
780 execution_time_ms,
781 )
783 @staticmethod
784 def _testsuite_runs_to_dict(
785 testsuite_runs: Iterable[TestsuiteRun],
786 ) -> defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]]:
787 testsuite_runs_dict: defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]] = defaultdict(
788 lambda: defaultdict(dict)
789 )
790 for testsuite_run in testsuite_runs:
791 testsuite_runs_dict[testsuite_run.machine.name][testsuite_run.run_id][testsuite_run.testsuite.name] = (
792 testsuite_run
793 )
794 return testsuite_runs_dict
796 @staticmethod
797 def _fetch_object_by_names(model: Model, names: Iterable[str] | str) -> dict[str, Model]:
798 objects: dict[str, Model] = {}
799 name: str
800 try:
801 if isinstance(names, Iterable):
802 for name in names:
803 objects[name] = model.objects.get(name=name)
804 elif isinstance(names, str):
805 name = names
806 objects[name] = model.objects.get(name=name)
807 except model.DoesNotExist as e:
808 raise ValueError(f"The object {name} does not exist in the database") from e
809 return objects
811 @staticmethod
812 def _create_missing(
813 model: Model,
814 model_str: str,
815 missing_objs: set,
816 key_field: str,
817 args: dict[str, Any],
818 filter: dict[str, Any] | None = None,
819 ) -> dict[Any, Model]:
820 if filter is None:
821 filter = {}
822 # Fetch the current list of objects
823 db_objs: list[Model] = list(model.objects.filter(**filter))
824 db_objs_key_fields: set = {getattr(obj, key_field) for obj in db_objs}
826 # Create the missing objects
827 to_create: list[Model] = []
828 for obj in missing_objs - db_objs_key_fields:
829 args[key_field] = obj
830 to_create.append(model(**args))
831 if to_create_len := len(to_create):
832 logger.info("Adding %s missing %s", to_create_len, model_str)
833 created_objs: list[Model] = model.objects.bulk_create(to_create)
834 all_needed_objs: dict[Any, Model] = {
835 getattr(created_obj, key_field): created_obj for created_obj in created_objs
836 }
837 else:
838 all_needed_objs = {}
840 # Add previously existing objects to the returned dictionary
841 for db_obj in db_objs:
842 key = getattr(db_obj, key_field)
843 if key in missing_objs:
844 all_needed_objs[key] = db_obj
845 return all_needed_objs
847 @staticmethod
848 def _issue_simple_stats_recomputing(
849 issue: Issue, runconfig: RunConfig, stats_changes: dict[IssueFilter, FilterStatsTuple]
850 ) -> None:
851 """
852 Iterate through all the filters associated to this issue and check if a filter's statistic has changed from not
853 covered/affected to covered/affected. If so, and if the issue was not already covered/affected, update the
854 statistics by directly doing +1 in the relevant stats field. Since we are only adding results, we cannot be in a
855 situation where we need to -1 an issue.
857 WARNING: This function is tailored for the purpose of adding results!
858 DO NOT USE AS A LIGHTWEIGHT REPLACEMENT FOR Issue.update_statistics()
859 """
860 was_covered: bool = False
861 has_new_filter_covering: bool = False
862 was_affected: bool = False
863 has_new_filter_matched: bool = False
864 previous: RunFilterStatistic | None
865 new: RunFilterStatistic | None
866 for issuefilter in issue.filters.all():
867 previous, new = stats_changes.get(issuefilter, FilterStatsTuple(None, None))
868 # If we have no previous stats for the filter, just fake empty ones
869 if previous is None:
870 previous = RunFilterStatistic(filter=issuefilter, runconfig=runconfig, matched_count=0, covered_count=0)
871 # Check if the issue was covered/affected for this runconfig before
872 if previous.covered_count > 0:
873 was_covered = True
874 if previous.matched_count > 0:
875 was_affected = True
876 # OPTIMIZATION: The issue was already affected, so no changes in
877 # statistics could come by adding more results
878 return
879 if new is not None:
880 if previous.covered_count == 0 and new.covered_count > 0:
881 has_new_filter_covering = True
882 if previous.matched_count == 0 and new.matched_count > 0:
883 has_new_filter_matched = True
884 # Update the covered/affected count if necessary
885 changed: bool = False
886 if not was_covered and has_new_filter_covering:
887 issue.runconfigs_covered_count += 1
888 changed = True
889 if not was_affected and has_new_filter_matched:
890 issue.runconfigs_affected_count += 1
891 issue.last_seen = runconfig.added_on
892 issue.last_seen_runconfig = runconfig
893 changed = True
894 if changed:
895 issue.save()
897 @property
898 @none_if_undefined
899 def commit_time(self) -> datetime.datetime | None:
900 return self._now
902 @property
903 @none_if_undefined
904 def builds(self) -> dict[str, Build] | None:
905 return self._builds
907 @property
908 @none_if_undefined
909 def tags(self) -> dict[str, RunConfigTag] | None:
910 return self._tags
912 @property
913 @none_if_undefined
914 def runconfig(self) -> RunConfig | None:
915 return self._runconfig
917 @property
918 @none_if_undefined
919 def testsuite_runs(self) -> defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]] | None:
920 return self._testsuite_runs
922 @property
923 @none_if_undefined
924 def machines(self) -> dict[str, Machine] | None:
925 return self._machines
927 @property
928 @none_if_undefined
929 def tests(self) -> dict[str, dict[str, Test]] | None:
930 return self._tests
932 @property
933 @none_if_undefined
934 def statuses(self) -> dict[str, dict[str, TextStatus]] | None:
935 return self._statuses
937 @property
938 @none_if_undefined
939 def test_results(self) -> list[TestResult] | None:
940 return self._test_results
942 @property
943 @none_if_undefined
944 def failures(self) -> list[TestResult] | None:
945 return self._failures
947 @property
948 @none_if_undefined
949 def ifas(self) -> defaultdict[IssueFilter, list[IssueFilterAssociated]] | None:
950 return self._ifas
952 @property
953 @none_if_undefined
954 def known_failures(self) -> list[KnownFailure] | None:
955 return self._known_failures
957 @property
958 @none_if_undefined
959 def unknown_failures(self) -> list[UnknownFailure] | None:
960 return self._unknown_failures
962 @property
963 @none_if_undefined
964 def statistics(self) -> list[RunFilterStatistic] | None:
965 return self._statistics
967 @property
968 @none_if_undefined
969 def issues(self) -> list[Issue] | None:
970 return self._issues
972 @property
973 @none_if_undefined
974 def archived_ifas(self) -> defaultdict[IssueFilter, list[IssueFilterAssociated]] | None:
975 return self._archived_ifas
977 @cached_property
978 @none_if_undefined
979 def test_results_by_machine_and_run(self) -> defaultdict[str, defaultdict[int, list[TestResult]]] | None:
980 test_results: defaultdict[str, defaultdict[int, dict[str, TestResult]]] = defaultdict(lambda: defaultdict(list))
981 for test_result in self._test_results:
982 test_results[test_result.machine_name_alias][test_result.run_id_alias].append(test_result)
983 return test_results