Coverage for CIResults/run_import.py: 87%
588 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-09 12:54 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-09 12:54 +0000
1from __future__ import annotations
3import configparser
4import copy
5import datetime
6import logging
7import os
8import sys
9import time
10from collections import defaultdict, namedtuple
11from collections.abc import Iterable
12from dataclasses import InitVar, dataclass, field
13from pathlib import Path
14from typing import Any, Callable, cast
16import pytz
17from django.core.exceptions import ValidationError
18from django.core.validators import URLValidator
19from django.db import transaction
20from django.db.models import Model, prefetch_related_objects
21from django.db.models.query import QuerySet
22from django.utils import timezone
23from django.utils.functional import cached_property
25from CIResults.models import (
26 Build,
27 Component,
28 Issue,
29 IssueFilter,
30 IssueFilterAssociated,
31 KnownFailure,
32 Machine,
33 RunConfig,
34 RunConfigTag,
35 RunFilterStatistic,
36 Test,
37 TestResult,
38 TestSuite,
39 TestsuiteRun,
40 TextStatus,
41 UnknownFailure,
42)
44current_dir = os.path.dirname(os.path.realpath(__file__))
45sys.path.append(os.path.join(current_dir, "piglit"))
46from framework import backends as piglit_backends, results as piglit_results # noqa
48# Set up logger
49logger = logging.getLogger("run_import")
50logger.setLevel("INFO")
51stream_handler = logging.StreamHandler(sys.stdout)
52stream_handler.setFormatter(
53 logging.Formatter(
54 "[{asctime}] [{levelname}:{name}] {message}",
55 style="{",
56 datefmt="%Y-%m-%d %H:%M",
57 )
58)
59logger.addHandler(stream_handler)
62def str_to_list(string: str | None, separator: str = " ") -> list:
63 if string is None:
64 return []
65 return [part.strip() for part in string.split(separator) if len(part) > 0]
68url_validator = URLValidator()
71def validate_url(url: str | None) -> str | None:
72 if url is None:
73 return None
74 try:
75 url_validator(url)
76 return url
77 except ValidationError:
78 return None
81def get_relative_path_depth(path: Path, other_path: Path) -> int:
82 return abs(len(path.parts) - len(other_path.parts))
85FilterStatsTuple = namedtuple("FilterStatsTuple", ("current", "new"))
88def none_if_undefined(func: Callable[..., Any | None]) -> Callable[..., Any | None]:
89 """Helper decorator for safe retrieving class attributes, if it is not certain, that they've been defined, yet."""
91 def wrapper(*args, **kwargs) -> Any | None:
92 try:
93 return func(*args, **kwargs)
94 except AttributeError:
95 return None
97 return wrapper
100@dataclass(frozen=True)
101class TestsuiteTestResult:
102 name: str
103 status: str
104 start_time: datetime.datetime
105 duration: datetime.timedelta
106 command: str | None = None
107 stdout: str | None = None
108 stderr: str | None = None
109 dmesg: str | None = None
110 url: str | None = None
113@dataclass(frozen=True)
114class TestsuiteRunResults:
115 testsuite: TestsuiteResults
116 machine_name: str
117 run_id: int
118 test_results: list[TestsuiteTestResult]
119 start_time: datetime.datetime
120 duration: datetime.timedelta
122 @staticmethod
123 def get_results_url(testsuite: TestsuiteResults, run_id: int, machine_name: str, test_name: str) -> str | None:
124 """Generates and returns the testresult's external URL"""
125 url: str = testsuite.result_url_pattern.format(
126 runconfig=testsuite.runconfig.name,
127 testsuite_build=testsuite.build,
128 run_id=run_id,
129 test=test_name,
130 machine=machine_name,
131 )
132 return validate_url(url)
134 @cached_property
135 def tests_set(self) -> set[str]:
136 return {test_result.name for test_result in self.test_results}
138 @cached_property
139 def statuses_set(self) -> set[str]:
140 return {test_result.status for test_result in self.test_results}
143class PiglitResult(TestsuiteRunResults):
144 def __init__(self, testsuite: TestsuiteResults, machine_name: str, run_id: int, dir_name: str) -> None:
145 test_results: list[TestsuiteTestResult] = []
146 try:
147 results: piglit_results.TestrunResult = piglit_backends.load(dir_name)
148 test_duration_sum: datetime.timedelta = datetime.timedelta()
149 test_name: str
150 test: piglit_results.TestResult
151 for test_name, test in results.tests.items():
152 url: str | None = self.get_results_url(testsuite, run_id, machine_name, test_name)
153 start_time: datetime.datetime = datetime.datetime.fromtimestamp(test.time.start, tz=pytz.utc)
154 duration: datetime.timedelta = datetime.timedelta(seconds=test.time.total)
155 test_duration_sum += duration
156 test_results.append(
157 TestsuiteTestResult(
158 name=test_name,
159 status=test.result,
160 start_time=start_time,
161 duration=duration,
162 command=test.command,
163 stdout=str(test.out),
164 stderr=str(test.err),
165 dmesg=test.dmesg,
166 url=url,
167 )
168 )
169 start: datetime.datetime = datetime.datetime.fromtimestamp(results.time_elapsed.start, tz=pytz.utc)
170 duration = datetime.timedelta(seconds=results.time_elapsed.total)
171 # Make sure the total duration is at least as long as the sum of all the test executions
172 duration = max(duration, test_duration_sum)
173 except pytz.exceptions.Error:
174 start = datetime.datetime.fromtimestamp(0, tz=pytz.utc)
175 duration = datetime.timedelta()
176 super().__init__(testsuite, machine_name, run_id, test_results, start, duration)
179class JsonResult(TestsuiteRunResults):
180 def __init__(
181 self, testsuite: TestsuiteResults, machine_name: str, run_id: int, test_results: list[TestsuiteTestResult]
182 ) -> None:
183 test_run_duration_sum: datetime.timedelta = datetime.timedelta()
184 for test_result in test_results:
185 test_run_duration_sum += test_result.duration
186 start: datetime.datetime = datetime.datetime.fromtimestamp(0, tz=pytz.utc)
187 super().__init__(testsuite, machine_name, run_id, test_results, start, test_run_duration_sum)
190@dataclass
191class TestsuiteResults:
192 runconfig: RunConfigResults
193 name: str
194 build: str
195 result_url_pattern: str
196 format: str
197 format_version: int | None = None
198 db_object: TestSuite = field(init=False)
199 _result_type: type[TestsuiteRunResults] = field(init=False)
201 def __post_init__(self) -> None:
202 # Check if the database contains the build, and then fetch the Testsuite associated with it
203 db_build = Build.objects.get(name=self.build)
204 self.db_object = TestSuite.objects.get(name=db_build.component)
205 self._set_result_type()
207 def _set_result_type(self) -> None:
208 match self.format:
209 case "piglit":
210 if self.format_version != 1:
211 raise ValueError(
212 f"The version {self.format_version} of the testsuite result format '{self.format}' "
213 "is unsupported"
214 )
215 self._result_type = PiglitResult
216 case "json":
217 self._result_type = JsonResult
218 case _:
219 raise ValueError(f"The testsuite result format '{self.format}' is unsupported")
221 def read_results(self, *args, **kwargs) -> TestsuiteRunResults:
222 if self._result_type not in (PiglitResult, JsonResult):
223 raise ValueError(f"This operation is unsupported for the data format '{self.format}'")
224 return self._result_type(self, *args, **kwargs)
227@dataclass
228class TestSuiteRunDef:
229 testsuite_build: str
230 results_format: str
231 results_format_version_raw: InitVar[int | str]
232 machine: str
233 testsuite_run_id_raw: InitVar[int | str]
234 testsuite_run_path: str
235 results_format_version: int = field(init=False)
236 testsuite_run_id: int = field(init=False)
238 def __post_init__(self, results_format_version_raw: int | str, testsuite_run_id_raw: int | str) -> None:
239 self.results_format_version = self._to_int("results_format_version_raw", results_format_version_raw)
240 self.testsuite_run_id = self._to_int("testsuite_run_id_raw", testsuite_run_id_raw)
241 for cls_field in ("testsuite_build", "results_format", "machine", "testsuite_run_id", "testsuite_run_path"):
242 if getattr(self, cls_field) is None:
243 raise ValueError(f"The parameter {cls_field} cannot be None")
245 @staticmethod
246 def _to_int(field_name: str, value: int | str) -> int:
247 try:
248 return int(value)
249 except Exception:
250 raise ValueError(f"The parameter {field_name} '{value}' should be an integer")
253@dataclass
254class RunConfigResults:
255 name: str
256 url: str | None = None
257 result_url_pattern: str = ""
258 environment: str | None = None
259 builds: list[str] = field(default_factory=list)
260 tags: list[str] = field(default_factory=list)
261 temporary: bool = False
262 run_results: list[TestsuiteRunResults] = field(default_factory=list)
263 testsuites: dict[str, TestsuiteResults] = field(default_factory=dict, init=False)
265 def _import_results(self, *args, **kwargs) -> None:
266 raise NotImplementedError
268 @cached_property
269 def tests(self) -> defaultdict[TestSuite, set[str]]:
270 tests: defaultdict[TestSuite, set[str]] = defaultdict(set)
271 for run in self.run_results:
272 tests[run.testsuite.db_object] |= run.tests_set
273 return tests
275 @cached_property
276 def machines(self) -> set[str]:
277 return {run.machine_name for run in self.run_results}
279 @cached_property
280 def text_statuses(self) -> defaultdict[TestSuite, set[str]]:
281 statuses: defaultdict[TestSuite, set[str]] = defaultdict(set)
282 for run in self.run_results:
283 statuses[run.testsuite.db_object] |= run.statuses_set
284 return statuses
287class RunConfigResultsFromDir(RunConfigResults):
288 _RUNCONFIG_SECTION_HEADER: str = "CIRESULTS_RUNCONFIG"
290 def __init__(self, runconfig_dir: str) -> None:
291 runconfig_path = os.path.join(runconfig_dir, "runconfig.ini")
292 self._error_prefix = f"The RunConfig file {runconfig_path} is invalid: "
294 runconfig_parser: configparser.ConfigParser = self._get_runconfig_parser(runconfig_path)
295 main_section_proxy: configparser.SectionProxy = runconfig_parser[self._RUNCONFIG_SECTION_HEADER]
296 name = main_section_proxy.get("name")
297 if name is None:
298 self._raise_parser_error("runconfig name unspecified")
299 super().__init__(
300 name=cast(str, name),
301 url=validate_url(main_section_proxy.get("url")),
302 result_url_pattern=main_section_proxy.get("result_url_pattern", ""),
303 environment=main_section_proxy.get("environment"),
304 builds=str_to_list(main_section_proxy.get("builds")),
305 tags=str_to_list(main_section_proxy.get("tags")),
306 temporary=main_section_proxy.getboolean("temporary", False),
307 )
308 # Parse the testsuite sections
309 for section_name in runconfig_parser.sections():
310 # Ignore the main section
311 if section_name == self._RUNCONFIG_SECTION_HEADER:
312 continue
313 build: str = runconfig_parser[section_name]["build"]
314 if build not in self.builds:
315 self._raise_parser_error(
316 f"The build '{build}' of the testsuite '{section_name}' is not found "
317 f"in the list of builds of the runconfig {self.name}"
318 )
319 format: str = runconfig_parser[section_name]["format"]
320 version: int = runconfig_parser[section_name].getint("version", 1)
321 result_url_pattern: str = runconfig_parser[section_name].get("result_url_pattern", "")
322 self.testsuites[section_name] = TestsuiteResults(
323 self, section_name, build, result_url_pattern, format, version
324 )
325 self._import_results(runconfig_dir)
327 def _import_results(self, runconfig_dir: str) -> None:
328 runconfig_dir_path: Path = Path(runconfig_dir)
329 testsuite_results: TestsuiteResults | None = None
330 for dirpath, dirnames, _ in runconfig_dir_path.walk():
331 relative_path_depth = get_relative_path_depth(runconfig_dir_path, dirpath)
332 match relative_path_depth:
333 case 1: # Testsuite results directory level
334 if (testsuite_results := self.testsuites.get(dirpath.stem)) is None:
335 dirnames.clear() # Clear directories list = stop Path.walk() from going deeper
336 logger.info(
337 "Ignore the testsuite '%s' because it is not listed in the runconfig file",
338 dirpath.stem,
339 )
340 continue
341 case 2: # Machine directory level
342 machine: str = dirpath.stem
343 continue
344 case 3: # Run directory level
345 dirnames.clear() # Clear directories list = stop Path.walk() from going deeper
346 case _:
347 continue
348 try:
349 testsuite_run_id: int = int(dirpath.stem)
350 except ValueError:
351 logger.warning("RunConfigResults: testsuite run ID '%s' should be an integer", dirpath.stem)
352 continue
353 testsuite_results = cast(TestsuiteResults, testsuite_results)
354 try:
355 self.run_results.append(testsuite_results.read_results(machine, testsuite_run_id, str(dirpath)))
356 except FileNotFoundError:
357 pass
358 except Exception:
359 logger.exception("")
361 def _get_runconfig_parser(self, runconfig_path: str) -> configparser.ConfigParser:
362 runconfig_parser: configparser.ConfigParser = configparser.ConfigParser()
363 runconfig_parser.read(runconfig_path)
364 if not runconfig_parser.has_section(self._RUNCONFIG_SECTION_HEADER):
365 self._raise_parser_error(f"missing the section {self._RUNCONFIG_SECTION_HEADER}")
366 return runconfig_parser
368 def _raise_parser_error(self, msg: str) -> None:
369 raise ValueError(self._error_prefix + msg)
372class RunConfigResultsFromArgs(RunConfigResults):
373 def __init__(
374 self,
375 name: str,
376 url: str | None = None,
377 result_url_pattern: str = "",
378 environment: str | None = None,
379 builds: list[str] | None = None,
380 tags: list[str] | None = None,
381 temporary: bool = False,
382 results: list[TestSuiteRunDef] | None = None,
383 ) -> None:
384 super().__init__(
385 name=name,
386 url=url,
387 result_url_pattern=result_url_pattern,
388 environment=environment,
389 builds=builds or [],
390 tags=tags or [],
391 temporary=temporary,
392 )
393 self._import_results(results or [])
395 def _import_results(self, results: list[TestSuiteRunDef]) -> None:
396 self._args_testsuites: dict[str, TestsuiteResults] = {}
397 testsuite_runs: set[tuple[str, int, str, str]] = set()
398 for result in results:
399 # Check if the testsuite build is in the list of builds of the runconfig
400 if result.testsuite_build not in self.builds:
401 raise ValueError(
402 f"The build named '{result.testsuite_build}' is not part of the list of builds of the runconfig"
403 )
404 try:
405 testsuite_result: TestsuiteResults = self._get_or_create_testsuite_results(result)
406 self._args_testsuites[result.testsuite_build] = testsuite_result
407 except Build.DoesNotExist:
408 raise ValueError(f"The build named '{result.testsuite_build}' does not exist")
409 # Check if the testsuite run has not been added already
410 testsuite_run_key_values: tuple[str, int, str, str] = (
411 testsuite_result.name,
412 result.testsuite_run_id,
413 self.name,
414 result.machine,
415 )
416 if testsuite_run_key_values in testsuite_runs:
417 raise ValueError(
418 f"Try to import twice {testsuite_result.name}'s run ID {result.testsuite_run_id} "
419 f"on the runconfig '{self.name}' for the machine '{result.machine}'"
420 )
421 testsuite_runs.add(testsuite_run_key_values)
422 # Import the results
423 try:
424 self.run_results.append(
425 testsuite_result.read_results(result.machine, result.testsuite_run_id, result.testsuite_run_path)
426 )
427 except FileNotFoundError:
428 pass
429 except Exception:
430 logger.exception("")
432 def _get_or_create_testsuite_results(self, result: TestSuiteRunDef) -> TestsuiteResults:
433 """Get the testsuite associated to the build name, or create it"""
434 testsuite_result: TestsuiteResults | None = self._args_testsuites.get(result.testsuite_build)
435 if testsuite_result is None:
436 testsuite_name = Build.objects.get(name=result.testsuite_build).component.name
437 # Create the testsuite results object
438 testsuite_result = TestsuiteResults(
439 self,
440 testsuite_name,
441 result.testsuite_build,
442 self.result_url_pattern,
443 result.results_format,
444 result.results_format_version,
445 )
446 return testsuite_result
449class ResultsCommitHandler:
450 def __init__(self, runconfig_results: RunConfigResults):
451 self._runconfig_results: RunConfigResults = runconfig_results
453 @transaction.atomic
454 def commit(
455 self,
456 new_machines_public: bool = False,
457 new_tests_public: bool = False,
458 new_machines_vetted: bool = False,
459 new_tests_vetted: bool = False,
460 new_statuses_vetted: bool = False,
461 ) -> None:
462 self._now = timezone.now()
463 self._now = cast(datetime.datetime, self._now)
464 self._fetch_builds_and_tags()
465 self._fetch_or_create_runconfig()
466 self._verify_duplicate_builds()
467 self._add_new_builds()
468 # Abort early if there is nothing else to do
469 if len(self._runconfig_results.run_results) == 0:
470 logger.info("No results to add, exiting...")
471 return
472 self._fetch_existing_testsuite_runs()
473 self._create_missing_machines(new_machines_public, new_machines_vetted)
474 self._create_missing_tests(new_tests_public, new_tests_vetted)
475 self._create_missing_statuses(new_statuses_vetted)
476 self._create_testsuite_runs()
477 self._create_test_results_and_find_failures()
478 self._fetch_issue_filter_associateds()
479 self._map_failures()
480 self._create_new_failures()
481 # Create the statistics objects, but only if the run was not temporary.
482 if not self._runconfig_results.temporary:
483 self._update_statistics_and_corresponding_issues()
484 if len(self._unknown_failures) > 0:
485 self._fetch_archived_issue_filter_associateds()
486 self._match_unknown_failures_to_archived_ifa()
488 def _fetch_builds_and_tags(self) -> None:
489 self._builds = self._fetch_object_by_names(Build, self._runconfig_results.builds)
490 self._tags = self._fetch_object_by_names(RunConfigTag, self._runconfig_results.tags)
492 def _fetch_or_create_runconfig(self) -> None:
493 self._runconfig, created = RunConfig.objects.get_or_create(
494 name=self._runconfig_results.name,
495 defaults={
496 "url": self._runconfig_results.url,
497 "environment": self._runconfig_results.environment,
498 "temporary": self._runconfig_results.temporary,
499 },
500 )
501 self._runconfig_builds: set[Build]
502 if created:
503 # Add runconfig to all the tags
504 for tag in self._tags.values():
505 self._runconfig.tags.add(tag)
506 # Runconfig builds are empty because we've just created the runconfig
507 self._runconfig_builds = set()
508 return
509 self._runconfig_builds = set(self._runconfig.builds.all())
511 def _verify_duplicate_builds(self) -> None:
512 """Verify that we do not have two builds for the same component"""
513 components: dict[Component, Build] = {}
514 for build in set(self._builds.values()) | self._runconfig_builds:
515 if build.component not in components:
516 components[build.component] = build
517 else:
518 raise ValueError(
519 f"ERROR: Two builds ({build} and {components[build.component]}) "
520 f"cannot be from the same component ({build.component})"
521 )
523 def _add_new_builds(self) -> None:
524 for new_build in set(self._builds.values()) - self._runconfig_builds:
525 self._runconfig.builds.add(new_build)
527 def _fetch_existing_testsuite_runs(self) -> None:
528 """Load all the existing testsuite runs on this runconfig"""
529 # NOTE: prefetch data that is later used for cheking IssueFilter's coverage
530 testsuite_runs_query: QuerySet[TestsuiteRun] = TestsuiteRun.objects.filter(
531 runconfig=self._runconfig
532 ).prefetch_related("machine__tags", "runconfig__tags", "testsuite")
533 self._testsuite_runs: defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]] = (
534 self._testsuite_runs_to_dict(testsuite_runs_query)
535 )
537 def _create_missing_machines(self, public: bool, vetted: bool) -> None:
538 self._machines: dict[str, Machine] = self._create_missing(
539 model=Machine,
540 model_str="machine(s)",
541 missing_objs=self._runconfig_results.machines,
542 key_field="name",
543 args={"public": public, "vetted_on": self._now if vetted else None},
544 )
546 def _create_missing_tests(self, public: bool, vetted: bool) -> None:
547 self._tests: dict[str, dict[str, Test]] = {}
548 current_tests_ids: list[int] = []
549 for testsuite, tests in self._runconfig_results.tests.items():
550 self._tests[testsuite.name] = self._create_missing(
551 model=Test,
552 model_str=f"test(s) ({testsuite})",
553 missing_objs=tests,
554 key_field="name",
555 args={
556 "public": public,
557 "testsuite": testsuite,
558 "vetted_on": self._now if vetted else None,
559 },
560 filter={"testsuite": testsuite},
561 )
562 # If the runconfig is non-temporary, convert the tests
563 if not self._runconfig_results.temporary:
564 current_tests_ids.extend([test.id for test in self._tests[testsuite.name].values()])
565 if current_tests_ids:
566 Test.objects.filter(pk__in=current_tests_ids, first_runconfig=None).update(first_runconfig=self._runconfig)
568 def _create_missing_statuses(self, vetted: bool) -> None:
569 self._statuses: dict[str, dict[str, TextStatus]] = {}
570 for testsuite, text_status in self._runconfig_results.text_statuses.items():
571 self._statuses[testsuite.name] = self._create_missing(
572 model=TextStatus,
573 model_str=f"status(es) ({testsuite})",
574 missing_objs=text_status,
575 key_field="name",
576 args={"testsuite": testsuite, "vetted_on": self._now if vetted else None},
577 filter={"testsuite": testsuite},
578 )
580 def _create_testsuite_runs(self) -> None:
581 self._new_run_results: list[TestsuiteRunResults] = []
582 to_create: list[TestsuiteRun] = []
583 for run_result in self._runconfig_results.run_results:
584 # Ignore the runs we already have
585 if (
586 self._testsuite_runs.get(run_result.machine_name, {})
587 .get(run_result.run_id, {})
588 .get(run_result.testsuite.name)
589 ) is not None:
590 continue
591 # Create the TestsuiteRun object
592 new_testsuite_run: TestsuiteRun = TestsuiteRun(
593 testsuite=run_result.testsuite.db_object,
594 runconfig=self._runconfig,
595 machine=self._machines[run_result.machine_name],
596 run_id=run_result.run_id,
597 start=run_result.start_time,
598 duration=run_result.duration,
599 )
600 to_create.append(new_testsuite_run)
601 self._new_run_results.append(run_result)
602 if not (to_create_len := len(to_create)):
603 return
604 logger.info("Adding %s testsuite runs", to_create_len)
605 new_testsuite_runs: list[TestsuiteRun] = TestsuiteRun.objects.bulk_create(to_create)
606 # NOTE: prefetch data that is later used for cheking IssueFilter's coverage
607 prefetch_related_objects(new_testsuite_runs, "machine__tags", "runconfig__tags", "testsuite")
608 self._testsuite_runs |= self._testsuite_runs_to_dict(new_testsuite_runs)
610 def _create_test_results_and_find_failures(self) -> None:
611 self._test_results: list[TestResult] = []
612 self._failures: list[TestResult] = []
613 for run in self._new_run_results:
614 for result in run.test_results:
615 testsuite = run.testsuite.db_object
616 test_result = TestResult(
617 test=self._tests[testsuite.name][result.name],
618 ts_run=self._testsuite_runs[run.machine_name][run.run_id][run.testsuite.name],
619 status=self._statuses[testsuite.name][result.status],
620 start=result.start_time,
621 duration=result.duration,
622 command=result.command,
623 stdout=result.stdout,
624 stderr=result.stderr,
625 dmesg=result.dmesg,
626 url=result.url,
627 )
628 self._test_results.append(test_result)
629 if test_result.is_failure:
630 self._failures.append(test_result)
631 if test_results_len := len(self._test_results):
632 logger.info("Adding %s test results", test_results_len)
633 self._test_results = TestResult.objects.bulk_create(self._test_results, batch_size=5000)
635 def _fetch_issue_filter_associateds(self) -> None:
636 """Fetch all the associated IssueFilters and their related issues"""
637 self._ifas: defaultdict[IssueFilter, list[IssueFilterAssociated]] = defaultdict(list)
638 # NOTE: prefetch data that is later used for cheking IssueFilter's coverage
639 active_ifas: QuerySet[IssueFilterAssociated] = IssueFilterAssociated.objects_ready_for_matching.filter(
640 deleted_on=None
641 ).prefetch_related("filter__tests__testsuite", "filter__statuses__testsuite")
642 for ifa in active_ifas:
643 self._ifas[ifa.filter].append(ifa)
645 def _map_failures(self) -> None:
646 """
647 Get the filter statistics already-existing on this runconfig and lock them for update.
648 Next, update filter statistics for the issue filter that covers the result.
649 """
650 start: float = time.time()
651 self._known_failures: list[KnownFailure] = []
652 self._unknown_failures: list[UnknownFailure] = []
653 self._fetch_and_lock_existing_filter_statistics()
654 for result in self._test_results:
655 found: bool = False
656 for issue_filter in self._ifas:
657 # Start matching the result to the current filter
658 if issue_filter.covers(result):
659 # Get or create a statistics object for the current filter and runconfig
660 filter_stats: FilterStatsTuple = self._filters_stats_changes.get(
661 issue_filter, FilterStatsTuple(None, None)
662 )
663 if filter_stats.current is None:
664 if filter_stats.current is not None:
665 filter_stats = FilterStatsTuple(filter_stats.current, copy.copy(filter_stats.current))
666 self._filters_stats_changes[issue_filter] = filter_stats
667 else:
668 filter_stats = FilterStatsTuple(
669 None,
670 RunFilterStatistic(
671 filter=issue_filter, runconfig=self._runconfig, matched_count=0, covered_count=0
672 ),
673 )
674 self._filters_stats_changes[issue_filter] = filter_stats
675 filter_stats.new.covered_count += 1
676 if result not in self._failures:
677 continue
678 if issue_filter.matches(result):
679 filter_stats.new.matched_count += 1
680 for ifa in self._ifas[issue_filter]:
681 self._known_failures.append(KnownFailure(result=result, matched_ifa=ifa))
682 found = True
683 if result in self._failures and not found:
684 self._unknown_failures.append(UnknownFailure(result=result))
685 execution_time_ms: float = (time.time() - start) * 1000
686 logger.info(
687 "Found %s test failures (%s filters matched, %s failures left unmatched) in %.2f ms",
688 len(self._failures),
689 len(self._known_failures),
690 len(self._unknown_failures),
691 execution_time_ms,
692 )
694 def _fetch_and_lock_existing_filter_statistics(self) -> None:
695 """Get the filters statistics already-existing on this runconfig and lock them for update"""
696 # Stores results as a namedtuple (current, new)
697 self._filters_stats_changes: dict[IssueFilter, FilterStatsTuple] = {}
698 if not self._runconfig_results.temporary:
699 for stat in RunFilterStatistic.objects.select_for_update().filter(runconfig=self._runconfig):
700 self._filters_stats_changes[stat.filter] = FilterStatsTuple(stat, None)
702 def _create_new_failures(self) -> None:
703 self._known_failures = KnownFailure.objects.bulk_create(self._known_failures)
704 self._unknown_failures = UnknownFailure.objects.bulk_create(self._unknown_failures)
706 @transaction.atomic
707 def _update_statistics_and_corresponding_issues(self) -> None:
708 # Create a transaction savepoint, in case the updating process fails
709 try:
710 self._update_statistics()
711 self._update_issues()
712 except Exception:
713 logger.exception("")
715 def _update_statistics(self) -> None:
716 """
717 Save all the stats (WARNING: some might already exist in the DB, hence update_conflict=True in bulk_create)
718 """
719 self._statistics: list[RunFilterStatistic] = [
720 filter_stats.new for filter_stats in self._filters_stats_changes.values() if filter_stats.new is not None
721 ]
722 logger.info("Updating the statistics of %s filters", len(self._statistics))
723 start: float = time.time()
724 try:
725 RunFilterStatistic.objects.bulk_create(
726 self._statistics,
727 update_conflicts=True,
728 unique_fields=("runconfig", "filter"),
729 update_fields=("covered_count", "matched_count"),
730 )
731 except Exception:
732 logger.exception("")
733 execution_time_ms: float = (time.time() - start) * 1000
734 logger.info("Filter statistics updated in %.2f ms", execution_time_ms)
736 def _update_issues(self) -> None:
737 """Get the list of issues that need to be updated because the filters statistics got updated"""
738 # NOTE: prefetch data that is later used for updating statistics
739 self._issues: list[Issue] = list(
740 Issue.objects.select_for_update()
741 .filter(filters__in=self._filters_stats_changes.keys())
742 .prefetch_related("filters")
743 )
744 logger.info("Updating the statistics of %s issues", len(self._issues))
745 start: float = time.time()
746 for issue in self._issues:
747 self._issue_simple_stats_recomputing(issue, self._runconfig, self._filters_stats_changes)
748 execution_time_ms: float = (time.time() - start) * 1000
749 logger.info("Issue statistics updated in %.2f ms", execution_time_ms)
751 def _fetch_archived_issue_filter_associateds(self) -> None:
752 """Fetch all archived IFAs that are less than 6 months old"""
753 self._archived_ifas: defaultdict[IssueFilter, list[IssueFilterAssociated]] = defaultdict(list)
754 archived_threshold: datetime.datetime = self._now - datetime.timedelta(days=180)
755 # NOTE: prefetch data that is later used for cheking IssueFilter's matching
756 self._db_archived_ifas: QuerySet[IssueFilterAssociated] = (
757 IssueFilterAssociated.objects_ready_for_matching.exclude(
758 deleted_on=None, deleted_on__lt=archived_threshold
759 ).prefetch_related("filter__tests__testsuite", "filter__statuses__testsuite")
760 )
761 for archived_ifa in self._db_archived_ifas:
762 self._archived_ifas[archived_ifa.filter].append(archived_ifa)
764 def _match_unknown_failures_to_archived_ifa(self) -> None:
765 """Try to match the unknown failures with archived IFAs"""
766 start: float = time.time()
767 filters_matching: set[IssueFilter] = set()
768 for failure in self._unknown_failures:
769 for issue_filter, ifas in self._archived_ifas.items():
770 if issue_filter.matches(failure.result):
771 failure.matched_archived_ifas.update(ifas)
772 filters_matching.add(issue_filter)
773 execution_time_ms: float = (time.time() - start) * 1000
774 logger.info(
775 "Found %s/%s recently-archived filters matching some unknown failures in %.2f ms",
776 len(filters_matching),
777 len(self._db_archived_ifas),
778 execution_time_ms,
779 )
781 @staticmethod
782 def _testsuite_runs_to_dict(
783 testsuite_runs: Iterable[TestsuiteRun],
784 ) -> defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]]:
785 testsuite_runs_dict: defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]] = defaultdict(
786 lambda: defaultdict(dict)
787 )
788 for testsuite_run in testsuite_runs:
789 testsuite_runs_dict[testsuite_run.machine.name][testsuite_run.run_id][testsuite_run.testsuite.name] = (
790 testsuite_run
791 )
792 return testsuite_runs_dict
794 @staticmethod
795 def _fetch_object_by_names(model: Model, names: Iterable[str] | str) -> dict[str, Model]:
796 objects: dict[str, Model] = {}
797 name: str
798 try:
799 if isinstance(names, Iterable):
800 for name in names:
801 objects[name] = model.objects.get(name=name)
802 elif isinstance(names, str):
803 name = names
804 objects[name] = model.objects.get(name=name)
805 except model.DoesNotExist as e:
806 raise ValueError(f"The object {name} does not exist in the database") from e
807 return objects
809 @staticmethod
810 def _create_missing(
811 model: Model,
812 model_str: str,
813 missing_objs: set,
814 key_field: str,
815 args: dict[str, Any],
816 filter: dict[str, Any] | None = None,
817 ) -> dict[Any, Model]:
818 if filter is None:
819 filter = {}
820 # Fetch the current list of objects
821 db_objs: list[Model] = list(model.objects.filter(**filter))
822 db_objs_key_fields: set = {getattr(obj, key_field) for obj in db_objs}
824 # Create the missing objects
825 to_create: list[Model] = []
826 for obj in missing_objs - db_objs_key_fields:
827 args[key_field] = obj
828 to_create.append(model(**args))
829 if to_create_len := len(to_create):
830 logger.info("Adding %s missing %s", to_create_len, model_str)
831 created_objs: list[Model] = model.objects.bulk_create(to_create)
832 all_needed_objs: dict[Any, Model] = {
833 getattr(created_obj, key_field): created_obj for created_obj in created_objs
834 }
835 else:
836 all_needed_objs = {}
838 # Add previously existing objects to the returned dictionary
839 for db_obj in db_objs:
840 key = getattr(db_obj, key_field)
841 if key in missing_objs:
842 all_needed_objs[key] = db_obj
843 return all_needed_objs
845 @staticmethod
846 def _issue_simple_stats_recomputing(
847 issue: Issue, runconfig: RunConfig, stats_changes: dict[IssueFilter, FilterStatsTuple]
848 ) -> None:
849 """
850 Iterate through all the filters associated to this issue and check if a filter's statistic has changed from not
851 covered/affected to covered/affected. If so, and if the issue was not already covered/affected, update the
852 statistics by directly doing +1 in the relevant stats field. Since we are only adding results, we cannot be in a
853 situation where we need to -1 an issue.
855 WARNING: This function is tailored for the purpose of adding results!
856 DO NOT USE AS A LIGHTWEIGHT REPLACEMENT FOR Issue.update_statistics()
857 """
858 was_covered: bool = False
859 has_new_filter_covering: bool = False
860 was_affected: bool = False
861 has_new_filter_matched: bool = False
862 previous: RunFilterStatistic | None
863 new: RunFilterStatistic | None
864 for issuefilter in issue.filters.all():
865 previous, new = stats_changes.get(issuefilter, FilterStatsTuple(None, None))
866 # If we have no previous stats for the filter, just fake empty ones
867 if previous is None:
868 previous = RunFilterStatistic(filter=issuefilter, runconfig=runconfig, matched_count=0, covered_count=0)
869 # Check if the issue was covered/affected for this runconfig before
870 if previous.covered_count > 0:
871 was_covered = True
872 if previous.matched_count > 0:
873 was_affected = True
874 # OPTIMIZATION: The issue was already affected, so no changes in
875 # statistics could come by adding more results
876 return
877 if new is not None:
878 if previous.covered_count == 0 and new.covered_count > 0:
879 has_new_filter_covering = True
880 if previous.matched_count == 0 and new.matched_count > 0:
881 has_new_filter_matched = True
882 # Update the covered/affected count if necessary
883 changed: bool = False
884 if not was_covered and has_new_filter_covering:
885 issue.runconfigs_covered_count += 1
886 changed = True
887 if not was_affected and has_new_filter_matched:
888 issue.runconfigs_affected_count += 1
889 issue.last_seen = runconfig.added_on
890 issue.last_seen_runconfig = runconfig
891 changed = True
892 if changed:
893 issue.save()
895 @property
896 @none_if_undefined
897 def commit_time(self) -> datetime.datetime | None:
898 return self._now
900 @property
901 @none_if_undefined
902 def builds(self) -> dict[str, Build] | None:
903 return self._builds
905 @property
906 @none_if_undefined
907 def tags(self) -> dict[str, RunConfigTag] | None:
908 return self._tags
910 @property
911 @none_if_undefined
912 def runconfig(self) -> RunConfig | None:
913 return self._runconfig
915 @property
916 @none_if_undefined
917 def testsuite_runs(self) -> defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]] | None:
918 return self._testsuite_runs
920 @property
921 @none_if_undefined
922 def machines(self) -> dict[str, Machine] | None:
923 return self._machines
925 @property
926 @none_if_undefined
927 def tests(self) -> dict[str, dict[str, Test]] | None:
928 return self._tests
930 @property
931 @none_if_undefined
932 def statuses(self) -> dict[str, dict[str, TextStatus]] | None:
933 return self._statuses
935 @property
936 @none_if_undefined
937 def test_results(self) -> list[TestResult] | None:
938 return self._test_results
940 @property
941 @none_if_undefined
942 def failures(self) -> list[TestResult] | None:
943 return self._failures
945 @property
946 @none_if_undefined
947 def ifas(self) -> defaultdict[IssueFilter, list[IssueFilterAssociated]] | None:
948 return self._ifas
950 @property
951 @none_if_undefined
952 def known_failures(self) -> list[KnownFailure] | None:
953 return self._known_failures
955 @property
956 @none_if_undefined
957 def unknown_failures(self) -> list[UnknownFailure] | None:
958 return self._unknown_failures
960 @property
961 @none_if_undefined
962 def statistics(self) -> list[RunFilterStatistic] | None:
963 return self._statistics
965 @property
966 @none_if_undefined
967 def issues(self) -> list[Issue] | None:
968 return self._issues
970 @property
971 @none_if_undefined
972 def archived_ifas(self) -> defaultdict[IssueFilter, list[IssueFilterAssociated]] | None:
973 return self._archived_ifas