Coverage for CIResults/run_import.py: 88%

597 statements  

« prev     ^ index     » next       coverage.py v7.10.2, created at 2025-08-06 08:12 +0000

1from __future__ import annotations 

2 

3import configparser 

4import copy 

5import datetime 

6import logging 

7import os 

8import sys 

9import time 

10from collections import defaultdict, namedtuple 

11from collections.abc import Iterable 

12from dataclasses import InitVar, dataclass, field 

13from pathlib import Path 

14from typing import Any, Callable, cast 

15 

16import pytz 

17from django.core.exceptions import ValidationError 

18from django.core.validators import URLValidator 

19from django.db import transaction 

20from django.db.models import Model, prefetch_related_objects 

21from django.db.models.query import QuerySet 

22from django.utils import timezone 

23from django.utils.functional import cached_property 

24 

25from CIResults.models import ( 

26 Build, 

27 Component, 

28 Issue, 

29 IssueFilter, 

30 IssueFilterAssociated, 

31 KnownFailure, 

32 Machine, 

33 RunConfig, 

34 RunConfigTag, 

35 RunFilterStatistic, 

36 Test, 

37 TestResult, 

38 TestSuite, 

39 TestsuiteRun, 

40 TextStatus, 

41 UnknownFailure, 

42) 

43 

44current_dir = os.path.dirname(os.path.realpath(__file__)) 

45sys.path.append(os.path.join(current_dir, "piglit")) 

46from framework import backends as piglit_backends, results as piglit_results # noqa 

47 

48# Set up logger 

49logger = logging.getLogger("run_import") 

50logger.setLevel("INFO") 

51stream_handler = logging.StreamHandler(sys.stdout) 

52stream_handler.setFormatter( 

53 logging.Formatter( 

54 "[{asctime}] [{levelname}:{name}] {message}", 

55 style="{", 

56 datefmt="%Y-%m-%d %H:%M", 

57 ) 

58) 

59logger.addHandler(stream_handler) 

60 

61 

62def str_to_list(string: str | None, separator: str = " ") -> list: 

63 if string is None: 

64 return [] 

65 return [part.strip() for part in string.split(separator) if len(part) > 0] 

66 

67 

68url_validator = URLValidator() 

69 

70 

71def validate_url(url: str | None) -> str | None: 

72 if url is None: 

73 return None 

74 try: 

75 url_validator(url) 

76 return url 

77 except ValidationError: 

78 return None 

79 

80 

81def get_relative_path_depth(path: Path, other_path: Path) -> int: 

82 return abs(len(path.parts) - len(other_path.parts)) 

83 

84 

85FilterStatsTuple = namedtuple("FilterStatsTuple", ("current", "new")) 

86 

87 

88def none_if_undefined(func: Callable[..., Any | None]) -> Callable[..., Any | None]: 

89 """Helper decorator for safe retrieving class attributes, if it is not certain, that they've been defined, yet.""" 

90 

91 def wrapper(*args, **kwargs) -> Any | None: 

92 try: 

93 return func(*args, **kwargs) 

94 except AttributeError: 

95 return None 

96 

97 return wrapper 

98 

99 

100@dataclass(frozen=True) 

101class TestsuiteTestResult: 

102 name: str 

103 status: str 

104 start_time: datetime.datetime 

105 duration: datetime.timedelta 

106 command: str | None = None 

107 stdout: str | None = None 

108 stderr: str | None = None 

109 dmesg: str | None = None 

110 url: str | None = None 

111 

112 

113@dataclass(frozen=True) 

114class TestsuiteRunResults: 

115 testsuite: TestsuiteResults 

116 machine_name: str 

117 run_id: int 

118 test_results: list[TestsuiteTestResult] 

119 start_time: datetime.datetime 

120 duration: datetime.timedelta 

121 

122 @staticmethod 

123 def get_results_url(testsuite: TestsuiteResults, run_id: int, machine_name: str, test_name: str) -> str | None: 

124 """Generates and returns the testresult's external URL""" 

125 url: str = testsuite.result_url_pattern.format( 

126 runconfig=testsuite.runconfig.name, 

127 testsuite_build=testsuite.build, 

128 run_id=run_id, 

129 test=test_name, 

130 machine=machine_name, 

131 ) 

132 return validate_url(url) 

133 

134 @cached_property 

135 def tests_set(self) -> set[str]: 

136 return {test_result.name for test_result in self.test_results} 

137 

138 @cached_property 

139 def statuses_set(self) -> set[str]: 

140 return {test_result.status for test_result in self.test_results} 

141 

142 

143class PiglitResult(TestsuiteRunResults): 

144 def __init__(self, testsuite: TestsuiteResults, machine_name: str, run_id: int, dir_name: str) -> None: 

145 test_results: list[TestsuiteTestResult] = [] 

146 try: 

147 results: piglit_results.TestrunResult = piglit_backends.load(dir_name) 

148 test_duration_sum: datetime.timedelta = datetime.timedelta() 

149 test_name: str 

150 test: piglit_results.TestResult 

151 for test_name, test in results.tests.items(): 

152 url: str | None = self.get_results_url(testsuite, run_id, machine_name, test_name) 

153 start_time: datetime.datetime = datetime.datetime.fromtimestamp(test.time.start, tz=pytz.utc) 

154 duration: datetime.timedelta = datetime.timedelta(seconds=test.time.total) 

155 test_duration_sum += duration 

156 test_results.append( 

157 TestsuiteTestResult( 

158 name=test_name, 

159 status=test.result, 

160 start_time=start_time, 

161 duration=duration, 

162 command=test.command, 

163 stdout=str(test.out), 

164 stderr=str(test.err), 

165 dmesg=test.dmesg, 

166 url=url, 

167 ) 

168 ) 

169 start: datetime.datetime = datetime.datetime.fromtimestamp(results.time_elapsed.start, tz=pytz.utc) 

170 duration = datetime.timedelta(seconds=results.time_elapsed.total) 

171 # Make sure the total duration is at least as long as the sum of all the test executions 

172 duration = max(duration, test_duration_sum) 

173 except pytz.exceptions.Error: 

174 start = datetime.datetime.fromtimestamp(0, tz=pytz.utc) 

175 duration = datetime.timedelta() 

176 super().__init__(testsuite, machine_name, run_id, test_results, start, duration) 

177 

178 

179class JsonResult(TestsuiteRunResults): 

180 def __init__( 

181 self, testsuite: TestsuiteResults, machine_name: str, run_id: int, test_results: list[TestsuiteTestResult] 

182 ) -> None: 

183 test_run_duration_sum: datetime.timedelta = datetime.timedelta() 

184 for test_result in test_results: 

185 test_run_duration_sum += test_result.duration 

186 start: datetime.datetime = datetime.datetime.fromtimestamp(0, tz=pytz.utc) 

187 super().__init__(testsuite, machine_name, run_id, test_results, start, test_run_duration_sum) 

188 

189 

190@dataclass 

191class TestsuiteResults: 

192 runconfig: RunConfigResults 

193 name: str 

194 build: str 

195 result_url_pattern: str 

196 format: str 

197 format_version: int | None = None 

198 db_object: TestSuite = field(init=False) 

199 _result_type: type[TestsuiteRunResults] = field(init=False) 

200 

201 def __post_init__(self) -> None: 

202 # Check if the database contains the build, and then fetch the Testsuite associated with it 

203 db_build = Build.objects.get(name=self.build) 

204 self.db_object = TestSuite.objects.get(name=db_build.component) 

205 self._set_result_type() 

206 

207 def _set_result_type(self) -> None: 

208 match self.format: 

209 case "piglit": 

210 if self.format_version != 1: 

211 raise ValueError( 

212 f"The version {self.format_version} of the testsuite result format '{self.format}' " 

213 "is unsupported" 

214 ) 

215 self._result_type = PiglitResult 

216 case "json": 

217 self._result_type = JsonResult 

218 case _: 

219 raise ValueError(f"The testsuite result format '{self.format}' is unsupported") 

220 

221 def read_results(self, *args, **kwargs) -> TestsuiteRunResults: 

222 if self._result_type not in (PiglitResult, JsonResult): 

223 raise ValueError(f"This operation is unsupported for the data format '{self.format}'") 

224 return self._result_type(self, *args, **kwargs) 

225 

226 

227@dataclass 

228class TestSuiteRunDef: 

229 testsuite_build: str 

230 results_format: str 

231 results_format_version_raw: InitVar[int | str] 

232 machine: str 

233 testsuite_run_id_raw: InitVar[int | str] 

234 testsuite_run_path: str 

235 results_format_version: int = field(init=False) 

236 testsuite_run_id: int = field(init=False) 

237 

238 def __post_init__(self, results_format_version_raw: int | str, testsuite_run_id_raw: int | str) -> None: 

239 self.results_format_version = self._to_int("results_format_version_raw", results_format_version_raw) 

240 self.testsuite_run_id = self._to_int("testsuite_run_id_raw", testsuite_run_id_raw) 

241 for cls_field in ("testsuite_build", "results_format", "machine", "testsuite_run_id", "testsuite_run_path"): 

242 if getattr(self, cls_field) is None: 

243 raise ValueError(f"The parameter {cls_field} cannot be None") 

244 

245 @staticmethod 

246 def _to_int(field_name: str, value: int | str) -> int: 

247 try: 

248 return int(value) 

249 except Exception: 

250 raise ValueError(f"The parameter {field_name} '{value}' should be an integer") 

251 

252 

253@dataclass 

254class RunConfigResults: 

255 name: str 

256 url: str | None = None 

257 result_url_pattern: str = "" 

258 environment: str | None = None 

259 builds: list[str] = field(default_factory=list) 

260 tags: list[str] = field(default_factory=list) 

261 temporary: bool = False 

262 run_results: list[TestsuiteRunResults] = field(default_factory=list) 

263 testsuites: dict[str, TestsuiteResults] = field(default_factory=dict, init=False) 

264 

265 def _import_results(self, *args, **kwargs) -> None: 

266 raise NotImplementedError 

267 

268 @cached_property 

269 def tests(self) -> defaultdict[TestSuite, set[str]]: 

270 tests: defaultdict[TestSuite, set[str]] = defaultdict(set) 

271 for run in self.run_results: 

272 tests[run.testsuite.db_object] |= run.tests_set 

273 return tests 

274 

275 @cached_property 

276 def machines(self) -> set[str]: 

277 return {run.machine_name for run in self.run_results} 

278 

279 @cached_property 

280 def text_statuses(self) -> defaultdict[TestSuite, set[str]]: 

281 statuses: defaultdict[TestSuite, set[str]] = defaultdict(set) 

282 for run in self.run_results: 

283 statuses[run.testsuite.db_object] |= run.statuses_set 

284 return statuses 

285 

286 

287class RunConfigResultsFromDir(RunConfigResults): 

288 _RUNCONFIG_SECTION_HEADER: str = "CIRESULTS_RUNCONFIG" 

289 

290 def __init__(self, runconfig_dir: str) -> None: 

291 runconfig_path = os.path.join(runconfig_dir, "runconfig.ini") 

292 self._error_prefix = f"The RunConfig file {runconfig_path} is invalid: " 

293 

294 runconfig_parser: configparser.ConfigParser = self._get_runconfig_parser(runconfig_path) 

295 main_section_proxy: configparser.SectionProxy = runconfig_parser[self._RUNCONFIG_SECTION_HEADER] 

296 name = main_section_proxy.get("name") 

297 if name is None: 

298 self._raise_parser_error("runconfig name unspecified") 

299 super().__init__( 

300 name=cast(str, name), 

301 url=validate_url(main_section_proxy.get("url")), 

302 result_url_pattern=main_section_proxy.get("result_url_pattern", ""), 

303 environment=main_section_proxy.get("environment"), 

304 builds=str_to_list(main_section_proxy.get("builds")), 

305 tags=str_to_list(main_section_proxy.get("tags")), 

306 temporary=main_section_proxy.getboolean("temporary", False), 

307 ) 

308 # Parse the testsuite sections 

309 for section_name in runconfig_parser.sections(): 

310 # Ignore the main section 

311 if section_name == self._RUNCONFIG_SECTION_HEADER: 

312 continue 

313 build: str = runconfig_parser[section_name]["build"] 

314 if build not in self.builds: 

315 self._raise_parser_error( 

316 f"The build '{build}' of the testsuite '{section_name}' is not found " 

317 f"in the list of builds of the runconfig {self.name}" 

318 ) 

319 format: str = runconfig_parser[section_name]["format"] 

320 version: int = runconfig_parser[section_name].getint("version", 1) 

321 result_url_pattern: str = runconfig_parser[section_name].get("result_url_pattern", "") 

322 self.testsuites[section_name] = TestsuiteResults( 

323 self, section_name, build, result_url_pattern, format, version 

324 ) 

325 self._import_results(runconfig_dir) 

326 

327 def _import_results(self, runconfig_dir: str) -> None: 

328 runconfig_dir_path: Path = Path(runconfig_dir) 

329 testsuite_results: TestsuiteResults | None = None 

330 for dirpath, dirnames, _ in runconfig_dir_path.walk(): 

331 relative_path_depth = get_relative_path_depth(runconfig_dir_path, dirpath) 

332 match relative_path_depth: 

333 case 1: # Testsuite results directory level 

334 if (testsuite_results := self.testsuites.get(dirpath.stem)) is None: 

335 dirnames.clear() # Clear directories list = stop Path.walk() from going deeper 

336 logger.info( 

337 "Ignore the testsuite '%s' because it is not listed in the runconfig file", 

338 dirpath.stem, 

339 ) 

340 continue 

341 case 2: # Machine directory level 

342 machine: str = dirpath.stem 

343 continue 

344 case 3: # Run directory level 

345 dirnames.clear() # Clear directories list = stop Path.walk() from going deeper 

346 case _: 

347 continue 

348 try: 

349 testsuite_run_id: int = int(dirpath.stem) 

350 except ValueError: 

351 logger.warning("RunConfigResults: testsuite run ID '%s' should be an integer", dirpath.stem) 

352 continue 

353 testsuite_results = cast(TestsuiteResults, testsuite_results) 

354 try: 

355 self.run_results.append(testsuite_results.read_results(machine, testsuite_run_id, str(dirpath))) 

356 except FileNotFoundError: 

357 pass 

358 except Exception: 

359 logger.exception("") 

360 

361 def _get_runconfig_parser(self, runconfig_path: str) -> configparser.ConfigParser: 

362 runconfig_parser: configparser.ConfigParser = configparser.ConfigParser() 

363 runconfig_parser.read(runconfig_path) 

364 if not runconfig_parser.has_section(self._RUNCONFIG_SECTION_HEADER): 

365 self._raise_parser_error(f"missing the section {self._RUNCONFIG_SECTION_HEADER}") 

366 return runconfig_parser 

367 

368 def _raise_parser_error(self, msg: str) -> None: 

369 raise ValueError(self._error_prefix + msg) 

370 

371 

372class RunConfigResultsFromArgs(RunConfigResults): 

373 def __init__( 

374 self, 

375 name: str, 

376 url: str | None = None, 

377 result_url_pattern: str = "", 

378 environment: str | None = None, 

379 builds: list[str] | None = None, 

380 tags: list[str] | None = None, 

381 temporary: bool = False, 

382 results: list[TestSuiteRunDef] | None = None, 

383 ) -> None: 

384 super().__init__( 

385 name=name, 

386 url=url, 

387 result_url_pattern=result_url_pattern, 

388 environment=environment, 

389 builds=builds or [], 

390 tags=tags or [], 

391 temporary=temporary, 

392 ) 

393 self._import_results(results or []) 

394 

395 def _import_results(self, results: list[TestSuiteRunDef]) -> None: 

396 self._args_testsuites: dict[str, TestsuiteResults] = {} 

397 testsuite_runs: set[tuple[str, int, str, str]] = set() 

398 for result in results: 

399 # Check if the testsuite build is in the list of builds of the runconfig 

400 if result.testsuite_build not in self.builds: 

401 raise ValueError( 

402 f"The build named '{result.testsuite_build}' is not part of the list of builds of the runconfig" 

403 ) 

404 try: 

405 testsuite_result: TestsuiteResults = self._get_or_create_testsuite_results(result) 

406 self._args_testsuites[result.testsuite_build] = testsuite_result 

407 except Build.DoesNotExist: 

408 raise ValueError(f"The build named '{result.testsuite_build}' does not exist") 

409 # Check if the testsuite run has not been added already 

410 testsuite_run_key_values: tuple[str, int, str, str] = ( 

411 testsuite_result.name, 

412 result.testsuite_run_id, 

413 self.name, 

414 result.machine, 

415 ) 

416 if testsuite_run_key_values in testsuite_runs: 

417 raise ValueError( 

418 f"Try to import twice {testsuite_result.name}'s run ID {result.testsuite_run_id} " 

419 f"on the runconfig '{self.name}' for the machine '{result.machine}'" 

420 ) 

421 testsuite_runs.add(testsuite_run_key_values) 

422 # Import the results 

423 try: 

424 self.run_results.append( 

425 testsuite_result.read_results(result.machine, result.testsuite_run_id, result.testsuite_run_path) 

426 ) 

427 except FileNotFoundError: 

428 pass 

429 except Exception: 

430 logger.exception("") 

431 

432 def _get_or_create_testsuite_results(self, result: TestSuiteRunDef) -> TestsuiteResults: 

433 """Get the testsuite associated to the build name, or create it""" 

434 testsuite_result: TestsuiteResults | None = self._args_testsuites.get(result.testsuite_build) 

435 if testsuite_result is None: 

436 testsuite_name = Build.objects.get(name=result.testsuite_build).component.name 

437 # Create the testsuite results object 

438 testsuite_result = TestsuiteResults( 

439 self, 

440 testsuite_name, 

441 result.testsuite_build, 

442 self.result_url_pattern, 

443 result.results_format, 

444 result.results_format_version, 

445 ) 

446 return testsuite_result 

447 

448 

449class ResultsCommitHandler: 

450 def __init__(self, runconfig_results: RunConfigResults): 

451 self._runconfig_results: RunConfigResults = runconfig_results 

452 

453 @transaction.atomic 

454 def commit( 

455 self, 

456 new_machines_public: bool = False, 

457 new_tests_public: bool = False, 

458 new_machines_vetted: bool = False, 

459 new_tests_vetted: bool = False, 

460 new_statuses_vetted: bool = False, 

461 ) -> None: 

462 self._now = timezone.now() 

463 self._now = cast(datetime.datetime, self._now) 

464 self._fetch_builds_and_tags() 

465 self._fetch_or_create_runconfig() 

466 self._verify_duplicate_builds() 

467 self._add_new_builds() 

468 # Abort early if there is nothing else to do 

469 if len(self._runconfig_results.run_results) == 0: 

470 logger.info("No results to add, exiting...") 

471 return 

472 self._fetch_existing_testsuite_runs() 

473 self._create_missing_machines(new_machines_public, new_machines_vetted) 

474 self._create_missing_tests(new_tests_public, new_tests_vetted) 

475 self._create_missing_statuses(new_statuses_vetted) 

476 self._create_testsuite_runs() 

477 self._create_test_results_and_find_failures() 

478 self._fetch_issue_filter_associateds() 

479 self._map_failures() 

480 self._create_new_failures() 

481 # Create the statistics objects, but only if the run was not temporary. 

482 if not self._runconfig_results.temporary: 

483 self._update_statistics_and_corresponding_issues() 

484 if len(self._unknown_failures) > 0: 

485 self._fetch_archived_issue_filter_associateds() 

486 self._match_unknown_failures_to_archived_ifa() 

487 

488 def _fetch_builds_and_tags(self) -> None: 

489 self._builds = self._fetch_object_by_names(Build, self._runconfig_results.builds) 

490 self._tags = self._fetch_object_by_names(RunConfigTag, self._runconfig_results.tags) 

491 

492 def _fetch_or_create_runconfig(self) -> None: 

493 self._runconfig, created = RunConfig.objects.get_or_create( 

494 name=self._runconfig_results.name, 

495 defaults={ 

496 "url": self._runconfig_results.url, 

497 "environment": self._runconfig_results.environment, 

498 "temporary": self._runconfig_results.temporary, 

499 }, 

500 ) 

501 self._runconfig_builds: set[Build] 

502 if created: 

503 # Add runconfig to all the tags 

504 for tag in self._tags.values(): 

505 self._runconfig.tags.add(tag) 

506 # Runconfig builds are empty because we've just created the runconfig 

507 self._runconfig_builds = set() 

508 return 

509 self._runconfig_builds = set(self._runconfig.builds.all()) 

510 

511 def _verify_duplicate_builds(self) -> None: 

512 """Verify that we do not have two builds for the same component""" 

513 components: dict[Component, Build] = {} 

514 for build in set(self._builds.values()) | self._runconfig_builds: 

515 if build.component not in components: 

516 components[build.component] = build 

517 else: 

518 raise ValueError( 

519 f"ERROR: Two builds ({build} and {components[build.component]}) " 

520 f"cannot be from the same component ({build.component})" 

521 ) 

522 

523 def _add_new_builds(self) -> None: 

524 for new_build in set(self._builds.values()) - self._runconfig_builds: 

525 self._runconfig.builds.add(new_build) 

526 

527 def _fetch_existing_testsuite_runs(self) -> None: 

528 """Load all the existing testsuite runs on this runconfig""" 

529 # NOTE: prefetch data that is later used for cheking IssueFilter's coverage 

530 testsuite_runs_query: QuerySet[TestsuiteRun] = TestsuiteRun.objects.filter( 

531 runconfig=self._runconfig 

532 ).prefetch_related("machine__tags", "runconfig__tags", "testsuite") 

533 self._testsuite_runs: defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]] = ( 

534 self._testsuite_runs_to_dict(testsuite_runs_query) 

535 ) 

536 

537 def _create_missing_machines(self, public: bool, vetted: bool) -> None: 

538 self._machines: dict[str, Machine] = self._create_missing( 

539 model=Machine, 

540 model_str="machine(s)", 

541 missing_objs=self._runconfig_results.machines, 

542 key_field="name", 

543 args={"public": public, "vetted_on": self._now if vetted else None}, 

544 ) 

545 

546 def _create_missing_tests(self, public: bool, vetted: bool) -> None: 

547 self._tests: dict[str, dict[str, Test]] = {} 

548 current_tests_ids: list[int] = [] 

549 for testsuite, tests in self._runconfig_results.tests.items(): 

550 self._tests[testsuite.name] = self._create_missing( 

551 model=Test, 

552 model_str=f"test(s) ({testsuite})", 

553 missing_objs=tests, 

554 key_field="name", 

555 args={ 

556 "public": public, 

557 "testsuite": testsuite, 

558 "vetted_on": self._now if vetted else None, 

559 }, 

560 filter={"testsuite": testsuite}, 

561 ) 

562 # If the runconfig is non-temporary, convert the tests 

563 if not self._runconfig_results.temporary: 

564 current_tests_ids.extend([test.id for test in self._tests[testsuite.name].values()]) 

565 if current_tests_ids: 

566 Test.objects.filter(pk__in=current_tests_ids, first_runconfig=None).update(first_runconfig=self._runconfig) 

567 

568 def _create_missing_statuses(self, vetted: bool) -> None: 

569 self._statuses: dict[str, dict[str, TextStatus]] = {} 

570 for testsuite, text_status in self._runconfig_results.text_statuses.items(): 

571 self._statuses[testsuite.name] = self._create_missing( 

572 model=TextStatus, 

573 model_str=f"status(es) ({testsuite})", 

574 missing_objs=text_status, 

575 key_field="name", 

576 args={"testsuite": testsuite, "vetted_on": self._now if vetted else None}, 

577 filter={"testsuite": testsuite}, 

578 ) 

579 

580 def _create_testsuite_runs(self) -> None: 

581 self._new_run_results: list[TestsuiteRunResults] = [] 

582 to_create: list[TestsuiteRun] = [] 

583 for run_result in self._runconfig_results.run_results: 

584 # Ignore the runs we already have 

585 if ( 

586 self._testsuite_runs.get(run_result.machine_name, {}) 

587 .get(run_result.run_id, {}) 

588 .get(run_result.testsuite.name) 

589 ) is not None: 

590 continue 

591 # Create the TestsuiteRun object 

592 new_testsuite_run: TestsuiteRun = TestsuiteRun( 

593 testsuite=run_result.testsuite.db_object, 

594 runconfig=self._runconfig, 

595 machine=self._machines[run_result.machine_name], 

596 run_id=run_result.run_id, 

597 start=run_result.start_time, 

598 duration=run_result.duration, 

599 ) 

600 to_create.append(new_testsuite_run) 

601 self._new_run_results.append(run_result) 

602 if not (to_create_len := len(to_create)): 

603 return 

604 logger.info("Adding %s testsuite runs", to_create_len) 

605 new_testsuite_runs: list[TestsuiteRun] = TestsuiteRun.objects.bulk_create(to_create) 

606 # NOTE: prefetch data that is later used for cheking IssueFilter's coverage 

607 prefetch_related_objects(new_testsuite_runs, "machine__tags", "runconfig__tags", "testsuite") 

608 self._testsuite_runs |= self._testsuite_runs_to_dict(new_testsuite_runs) 

609 

610 def _create_test_results_and_find_failures(self) -> None: 

611 self._test_results: list[TestResult] = [] 

612 self._failures: list[TestResult] = [] 

613 for run in self._new_run_results: 

614 for result in run.test_results: 

615 testsuite = run.testsuite.db_object 

616 test_result = TestResult( 

617 test=self._tests[testsuite.name][result.name], 

618 ts_run=self._testsuite_runs[run.machine_name][run.run_id][run.testsuite.name], 

619 status=self._statuses[testsuite.name][result.status], 

620 start=result.start_time, 

621 duration=result.duration, 

622 command=result.command, 

623 stdout=result.stdout, 

624 stderr=result.stderr, 

625 dmesg=result.dmesg, 

626 url=result.url, 

627 ) 

628 test_result.machine_name_alias = run.machine_name 

629 test_result.run_id_alias = run.run_id 

630 self._test_results.append(test_result) 

631 if test_result.is_failure: 

632 self._failures.append(test_result) 

633 if test_results_len := len(self._test_results): 

634 logger.info("Adding %s test results", test_results_len) 

635 self._test_results = TestResult.objects.bulk_create(self._test_results, batch_size=5000) 

636 

637 def _fetch_issue_filter_associateds(self) -> None: 

638 """Fetch all the associated IssueFilters and their related issues""" 

639 self._ifas: defaultdict[IssueFilter, list[IssueFilterAssociated]] = defaultdict(list) 

640 # NOTE: prefetch data that is later used for cheking IssueFilter's coverage 

641 active_ifas: QuerySet[IssueFilterAssociated] = IssueFilterAssociated.objects_ready_for_matching.filter( 

642 deleted_on=None 

643 ).prefetch_related("filter__tests__testsuite", "filter__statuses__testsuite") 

644 for ifa in active_ifas: 

645 self._ifas[ifa.filter].append(ifa) 

646 

647 def _map_failures(self) -> None: 

648 """ 

649 Get the filter statistics already-existing on this runconfig and lock them for update. 

650 Next, update filter statistics for the issue filter that covers the result. 

651 """ 

652 start: float = time.time() 

653 self._known_failures: list[KnownFailure] = [] 

654 self._unknown_failures: list[UnknownFailure] = [] 

655 self._fetch_and_lock_existing_filter_statistics() 

656 for result in self._test_results: 

657 found: bool = False 

658 for issue_filter in self._ifas: 

659 # Start matching the result to the current filter 

660 if issue_filter.covers(result): 

661 # Get or create a statistics object for the current filter and runconfig 

662 filter_stats: FilterStatsTuple = self._filters_stats_changes.get( 

663 issue_filter, FilterStatsTuple(None, None) 

664 ) 

665 if filter_stats.new is None: 

666 if filter_stats.current is not None: 

667 filter_stats = FilterStatsTuple(filter_stats.current, copy.copy(filter_stats.current)) 

668 self._filters_stats_changes[issue_filter] = filter_stats 

669 else: 

670 filter_stats = FilterStatsTuple( 

671 None, 

672 RunFilterStatistic( 

673 filter=issue_filter, runconfig=self._runconfig, matched_count=0, covered_count=0 

674 ), 

675 ) 

676 self._filters_stats_changes[issue_filter] = filter_stats 

677 filter_stats.new.covered_count += 1 

678 if result not in self._failures: 

679 continue 

680 if issue_filter.matches(result): 

681 filter_stats.new.matched_count += 1 

682 for ifa in self._ifas[issue_filter]: 

683 self._known_failures.append(KnownFailure(result=result, matched_ifa=ifa)) 

684 found = True 

685 if result in self._failures and not found: 

686 self._unknown_failures.append(UnknownFailure(result=result)) 

687 execution_time_ms: float = (time.time() - start) * 1000 

688 logger.info( 

689 "Found %s test failures (%s filters matched, %s failures left unmatched) in %.2f ms", 

690 len(self._failures), 

691 len(self._known_failures), 

692 len(self._unknown_failures), 

693 execution_time_ms, 

694 ) 

695 

696 def _fetch_and_lock_existing_filter_statistics(self) -> None: 

697 """Get the filters statistics already-existing on this runconfig and lock them for update""" 

698 # Stores results as a namedtuple (current, new) 

699 self._filters_stats_changes: dict[IssueFilter, FilterStatsTuple] = {} 

700 if not self._runconfig_results.temporary: 

701 for stat in RunFilterStatistic.objects.select_for_update().filter(runconfig=self._runconfig): 

702 self._filters_stats_changes[stat.filter] = FilterStatsTuple(stat, None) 

703 

704 def _create_new_failures(self) -> None: 

705 self._known_failures = KnownFailure.objects.bulk_create(self._known_failures) 

706 self._unknown_failures = UnknownFailure.objects.bulk_create(self._unknown_failures) 

707 

708 @transaction.atomic 

709 def _update_statistics_and_corresponding_issues(self) -> None: 

710 # Create a transaction savepoint, in case the updating process fails 

711 try: 

712 self._update_statistics() 

713 self._update_issues() 

714 except Exception: 

715 logger.exception("") 

716 

717 def _update_statistics(self) -> None: 

718 """ 

719 Save all the stats (WARNING: some might already exist in the DB, hence update_conflict=True in bulk_create) 

720 """ 

721 self._statistics: list[RunFilterStatistic] = [ 

722 filter_stats.new for filter_stats in self._filters_stats_changes.values() if filter_stats.new is not None 

723 ] 

724 logger.info("Updating the statistics of %s filters", len(self._statistics)) 

725 start: float = time.time() 

726 try: 

727 RunFilterStatistic.objects.bulk_create( 

728 self._statistics, 

729 update_conflicts=True, 

730 unique_fields=("runconfig", "filter"), 

731 update_fields=("covered_count", "matched_count"), 

732 ) 

733 except Exception: 

734 logger.exception("") 

735 execution_time_ms: float = (time.time() - start) * 1000 

736 logger.info("Filter statistics updated in %.2f ms", execution_time_ms) 

737 

738 def _update_issues(self) -> None: 

739 """Get the list of issues that need to be updated because the filters statistics got updated""" 

740 # NOTE: prefetch data that is later used for updating statistics 

741 self._issues: list[Issue] = list( 

742 Issue.objects.select_for_update() 

743 .filter(filters__in=self._filters_stats_changes.keys()) 

744 .prefetch_related("filters") 

745 ) 

746 logger.info("Updating the statistics of %s issues", len(self._issues)) 

747 start: float = time.time() 

748 for issue in self._issues: 

749 self._issue_simple_stats_recomputing(issue, self._runconfig, self._filters_stats_changes) 

750 execution_time_ms: float = (time.time() - start) * 1000 

751 logger.info("Issue statistics updated in %.2f ms", execution_time_ms) 

752 

753 def _fetch_archived_issue_filter_associateds(self) -> None: 

754 """Fetch all archived IFAs that are less than 6 months old""" 

755 self._archived_ifas: defaultdict[IssueFilter, list[IssueFilterAssociated]] = defaultdict(list) 

756 archived_threshold: datetime.datetime = self._now - datetime.timedelta(days=180) 

757 # NOTE: prefetch data that is later used for cheking IssueFilter's matching 

758 self._db_archived_ifas: QuerySet[IssueFilterAssociated] = ( 

759 IssueFilterAssociated.objects_ready_for_matching.exclude( 

760 deleted_on=None, deleted_on__lt=archived_threshold 

761 ).prefetch_related("filter__tests__testsuite", "filter__statuses__testsuite") 

762 ) 

763 for archived_ifa in self._db_archived_ifas: 

764 self._archived_ifas[archived_ifa.filter].append(archived_ifa) 

765 

766 def _match_unknown_failures_to_archived_ifa(self) -> None: 

767 """Try to match the unknown failures with archived IFAs""" 

768 start: float = time.time() 

769 filters_matching: set[IssueFilter] = set() 

770 for failure in self._unknown_failures: 

771 for issue_filter, ifas in self._archived_ifas.items(): 

772 if issue_filter.matches(failure.result): 

773 failure.matched_archived_ifas.add(*ifas) 

774 filters_matching.add(issue_filter) 

775 execution_time_ms: float = (time.time() - start) * 1000 

776 logger.info( 

777 "Found %s/%s recently-archived filters matching some unknown failures in %.2f ms", 

778 len(filters_matching), 

779 len(self._db_archived_ifas), 

780 execution_time_ms, 

781 ) 

782 

783 @staticmethod 

784 def _testsuite_runs_to_dict( 

785 testsuite_runs: Iterable[TestsuiteRun], 

786 ) -> defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]]: 

787 testsuite_runs_dict: defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]] = defaultdict( 

788 lambda: defaultdict(dict) 

789 ) 

790 for testsuite_run in testsuite_runs: 

791 testsuite_runs_dict[testsuite_run.machine.name][testsuite_run.run_id][testsuite_run.testsuite.name] = ( 

792 testsuite_run 

793 ) 

794 return testsuite_runs_dict 

795 

796 @staticmethod 

797 def _fetch_object_by_names(model: Model, names: Iterable[str] | str) -> dict[str, Model]: 

798 objects: dict[str, Model] = {} 

799 name: str 

800 try: 

801 if isinstance(names, Iterable): 

802 for name in names: 

803 objects[name] = model.objects.get(name=name) 

804 elif isinstance(names, str): 

805 name = names 

806 objects[name] = model.objects.get(name=name) 

807 except model.DoesNotExist as e: 

808 raise ValueError(f"The object {name} does not exist in the database") from e 

809 return objects 

810 

811 @staticmethod 

812 def _create_missing( 

813 model: Model, 

814 model_str: str, 

815 missing_objs: set, 

816 key_field: str, 

817 args: dict[str, Any], 

818 filter: dict[str, Any] | None = None, 

819 ) -> dict[Any, Model]: 

820 if filter is None: 

821 filter = {} 

822 # Fetch the current list of objects 

823 db_objs: list[Model] = list(model.objects.filter(**filter)) 

824 db_objs_key_fields: set = {getattr(obj, key_field) for obj in db_objs} 

825 

826 # Create the missing objects 

827 to_create: list[Model] = [] 

828 for obj in missing_objs - db_objs_key_fields: 

829 args[key_field] = obj 

830 to_create.append(model(**args)) 

831 if to_create_len := len(to_create): 

832 logger.info("Adding %s missing %s", to_create_len, model_str) 

833 created_objs: list[Model] = model.objects.bulk_create(to_create) 

834 all_needed_objs: dict[Any, Model] = { 

835 getattr(created_obj, key_field): created_obj for created_obj in created_objs 

836 } 

837 else: 

838 all_needed_objs = {} 

839 

840 # Add previously existing objects to the returned dictionary 

841 for db_obj in db_objs: 

842 key = getattr(db_obj, key_field) 

843 if key in missing_objs: 

844 all_needed_objs[key] = db_obj 

845 return all_needed_objs 

846 

847 @staticmethod 

848 def _issue_simple_stats_recomputing( 

849 issue: Issue, runconfig: RunConfig, stats_changes: dict[IssueFilter, FilterStatsTuple] 

850 ) -> None: 

851 """ 

852 Iterate through all the filters associated to this issue and check if a filter's statistic has changed from not 

853 covered/affected to covered/affected. If so, and if the issue was not already covered/affected, update the 

854 statistics by directly doing +1 in the relevant stats field. Since we are only adding results, we cannot be in a 

855 situation where we need to -1 an issue. 

856 

857 WARNING: This function is tailored for the purpose of adding results! 

858 DO NOT USE AS A LIGHTWEIGHT REPLACEMENT FOR Issue.update_statistics() 

859 """ 

860 was_covered: bool = False 

861 has_new_filter_covering: bool = False 

862 was_affected: bool = False 

863 has_new_filter_matched: bool = False 

864 previous: RunFilterStatistic | None 

865 new: RunFilterStatistic | None 

866 for issuefilter in issue.filters.all(): 

867 previous, new = stats_changes.get(issuefilter, FilterStatsTuple(None, None)) 

868 # If we have no previous stats for the filter, just fake empty ones 

869 if previous is None: 

870 previous = RunFilterStatistic(filter=issuefilter, runconfig=runconfig, matched_count=0, covered_count=0) 

871 # Check if the issue was covered/affected for this runconfig before 

872 if previous.covered_count > 0: 

873 was_covered = True 

874 if previous.matched_count > 0: 

875 was_affected = True 

876 # OPTIMIZATION: The issue was already affected, so no changes in 

877 # statistics could come by adding more results 

878 return 

879 if new is not None: 

880 if previous.covered_count == 0 and new.covered_count > 0: 

881 has_new_filter_covering = True 

882 if previous.matched_count == 0 and new.matched_count > 0: 

883 has_new_filter_matched = True 

884 # Update the covered/affected count if necessary 

885 changed: bool = False 

886 if not was_covered and has_new_filter_covering: 

887 issue.runconfigs_covered_count += 1 

888 changed = True 

889 if not was_affected and has_new_filter_matched: 

890 issue.runconfigs_affected_count += 1 

891 issue.last_seen = runconfig.added_on 

892 issue.last_seen_runconfig = runconfig 

893 changed = True 

894 if changed: 

895 issue.save() 

896 

897 @property 

898 @none_if_undefined 

899 def commit_time(self) -> datetime.datetime | None: 

900 return self._now 

901 

902 @property 

903 @none_if_undefined 

904 def builds(self) -> dict[str, Build] | None: 

905 return self._builds 

906 

907 @property 

908 @none_if_undefined 

909 def tags(self) -> dict[str, RunConfigTag] | None: 

910 return self._tags 

911 

912 @property 

913 @none_if_undefined 

914 def runconfig(self) -> RunConfig | None: 

915 return self._runconfig 

916 

917 @property 

918 @none_if_undefined 

919 def testsuite_runs(self) -> defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]] | None: 

920 return self._testsuite_runs 

921 

922 @property 

923 @none_if_undefined 

924 def machines(self) -> dict[str, Machine] | None: 

925 return self._machines 

926 

927 @property 

928 @none_if_undefined 

929 def tests(self) -> dict[str, dict[str, Test]] | None: 

930 return self._tests 

931 

932 @property 

933 @none_if_undefined 

934 def statuses(self) -> dict[str, dict[str, TextStatus]] | None: 

935 return self._statuses 

936 

937 @property 

938 @none_if_undefined 

939 def test_results(self) -> list[TestResult] | None: 

940 return self._test_results 

941 

942 @property 

943 @none_if_undefined 

944 def failures(self) -> list[TestResult] | None: 

945 return self._failures 

946 

947 @property 

948 @none_if_undefined 

949 def ifas(self) -> defaultdict[IssueFilter, list[IssueFilterAssociated]] | None: 

950 return self._ifas 

951 

952 @property 

953 @none_if_undefined 

954 def known_failures(self) -> list[KnownFailure] | None: 

955 return self._known_failures 

956 

957 @property 

958 @none_if_undefined 

959 def unknown_failures(self) -> list[UnknownFailure] | None: 

960 return self._unknown_failures 

961 

962 @property 

963 @none_if_undefined 

964 def statistics(self) -> list[RunFilterStatistic] | None: 

965 return self._statistics 

966 

967 @property 

968 @none_if_undefined 

969 def issues(self) -> list[Issue] | None: 

970 return self._issues 

971 

972 @property 

973 @none_if_undefined 

974 def archived_ifas(self) -> defaultdict[IssueFilter, list[IssueFilterAssociated]] | None: 

975 return self._archived_ifas 

976 

977 @cached_property 

978 @none_if_undefined 

979 def test_results_by_machine_and_run(self) -> defaultdict[str, defaultdict[int, list[TestResult]]] | None: 

980 test_results: defaultdict[str, defaultdict[int, dict[str, TestResult]]] = defaultdict(lambda: defaultdict(list)) 

981 for test_result in self._test_results: 

982 test_results[test_result.machine_name_alias][test_result.run_id_alias].append(test_result) 

983 return test_results