Coverage for CIResults/run_import.py: 87%

588 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-09 12:54 +0000

1from __future__ import annotations 

2 

3import configparser 

4import copy 

5import datetime 

6import logging 

7import os 

8import sys 

9import time 

10from collections import defaultdict, namedtuple 

11from collections.abc import Iterable 

12from dataclasses import InitVar, dataclass, field 

13from pathlib import Path 

14from typing import Any, Callable, cast 

15 

16import pytz 

17from django.core.exceptions import ValidationError 

18from django.core.validators import URLValidator 

19from django.db import transaction 

20from django.db.models import Model, prefetch_related_objects 

21from django.db.models.query import QuerySet 

22from django.utils import timezone 

23from django.utils.functional import cached_property 

24 

25from CIResults.models import ( 

26 Build, 

27 Component, 

28 Issue, 

29 IssueFilter, 

30 IssueFilterAssociated, 

31 KnownFailure, 

32 Machine, 

33 RunConfig, 

34 RunConfigTag, 

35 RunFilterStatistic, 

36 Test, 

37 TestResult, 

38 TestSuite, 

39 TestsuiteRun, 

40 TextStatus, 

41 UnknownFailure, 

42) 

43 

44current_dir = os.path.dirname(os.path.realpath(__file__)) 

45sys.path.append(os.path.join(current_dir, "piglit")) 

46from framework import backends as piglit_backends, results as piglit_results # noqa 

47 

48# Set up logger 

49logger = logging.getLogger("run_import") 

50logger.setLevel("INFO") 

51stream_handler = logging.StreamHandler(sys.stdout) 

52stream_handler.setFormatter( 

53 logging.Formatter( 

54 "[{asctime}] [{levelname}:{name}] {message}", 

55 style="{", 

56 datefmt="%Y-%m-%d %H:%M", 

57 ) 

58) 

59logger.addHandler(stream_handler) 

60 

61 

62def str_to_list(string: str | None, separator: str = " ") -> list: 

63 if string is None: 

64 return [] 

65 return [part.strip() for part in string.split(separator) if len(part) > 0] 

66 

67 

68url_validator = URLValidator() 

69 

70 

71def validate_url(url: str | None) -> str | None: 

72 if url is None: 

73 return None 

74 try: 

75 url_validator(url) 

76 return url 

77 except ValidationError: 

78 return None 

79 

80 

81def get_relative_path_depth(path: Path, other_path: Path) -> int: 

82 return abs(len(path.parts) - len(other_path.parts)) 

83 

84 

85FilterStatsTuple = namedtuple("FilterStatsTuple", ("current", "new")) 

86 

87 

88def none_if_undefined(func: Callable[..., Any | None]) -> Callable[..., Any | None]: 

89 """Helper decorator for safe retrieving class attributes, if it is not certain, that they've been defined, yet.""" 

90 

91 def wrapper(*args, **kwargs) -> Any | None: 

92 try: 

93 return func(*args, **kwargs) 

94 except AttributeError: 

95 return None 

96 

97 return wrapper 

98 

99 

100@dataclass(frozen=True) 

101class TestsuiteTestResult: 

102 name: str 

103 status: str 

104 start_time: datetime.datetime 

105 duration: datetime.timedelta 

106 command: str | None = None 

107 stdout: str | None = None 

108 stderr: str | None = None 

109 dmesg: str | None = None 

110 url: str | None = None 

111 

112 

113@dataclass(frozen=True) 

114class TestsuiteRunResults: 

115 testsuite: TestsuiteResults 

116 machine_name: str 

117 run_id: int 

118 test_results: list[TestsuiteTestResult] 

119 start_time: datetime.datetime 

120 duration: datetime.timedelta 

121 

122 @staticmethod 

123 def get_results_url(testsuite: TestsuiteResults, run_id: int, machine_name: str, test_name: str) -> str | None: 

124 """Generates and returns the testresult's external URL""" 

125 url: str = testsuite.result_url_pattern.format( 

126 runconfig=testsuite.runconfig.name, 

127 testsuite_build=testsuite.build, 

128 run_id=run_id, 

129 test=test_name, 

130 machine=machine_name, 

131 ) 

132 return validate_url(url) 

133 

134 @cached_property 

135 def tests_set(self) -> set[str]: 

136 return {test_result.name for test_result in self.test_results} 

137 

138 @cached_property 

139 def statuses_set(self) -> set[str]: 

140 return {test_result.status for test_result in self.test_results} 

141 

142 

143class PiglitResult(TestsuiteRunResults): 

144 def __init__(self, testsuite: TestsuiteResults, machine_name: str, run_id: int, dir_name: str) -> None: 

145 test_results: list[TestsuiteTestResult] = [] 

146 try: 

147 results: piglit_results.TestrunResult = piglit_backends.load(dir_name) 

148 test_duration_sum: datetime.timedelta = datetime.timedelta() 

149 test_name: str 

150 test: piglit_results.TestResult 

151 for test_name, test in results.tests.items(): 

152 url: str | None = self.get_results_url(testsuite, run_id, machine_name, test_name) 

153 start_time: datetime.datetime = datetime.datetime.fromtimestamp(test.time.start, tz=pytz.utc) 

154 duration: datetime.timedelta = datetime.timedelta(seconds=test.time.total) 

155 test_duration_sum += duration 

156 test_results.append( 

157 TestsuiteTestResult( 

158 name=test_name, 

159 status=test.result, 

160 start_time=start_time, 

161 duration=duration, 

162 command=test.command, 

163 stdout=str(test.out), 

164 stderr=str(test.err), 

165 dmesg=test.dmesg, 

166 url=url, 

167 ) 

168 ) 

169 start: datetime.datetime = datetime.datetime.fromtimestamp(results.time_elapsed.start, tz=pytz.utc) 

170 duration = datetime.timedelta(seconds=results.time_elapsed.total) 

171 # Make sure the total duration is at least as long as the sum of all the test executions 

172 duration = max(duration, test_duration_sum) 

173 except pytz.exceptions.Error: 

174 start = datetime.datetime.fromtimestamp(0, tz=pytz.utc) 

175 duration = datetime.timedelta() 

176 super().__init__(testsuite, machine_name, run_id, test_results, start, duration) 

177 

178 

179class JsonResult(TestsuiteRunResults): 

180 def __init__( 

181 self, testsuite: TestsuiteResults, machine_name: str, run_id: int, test_results: list[TestsuiteTestResult] 

182 ) -> None: 

183 test_run_duration_sum: datetime.timedelta = datetime.timedelta() 

184 for test_result in test_results: 

185 test_run_duration_sum += test_result.duration 

186 start: datetime.datetime = datetime.datetime.fromtimestamp(0, tz=pytz.utc) 

187 super().__init__(testsuite, machine_name, run_id, test_results, start, test_run_duration_sum) 

188 

189 

190@dataclass 

191class TestsuiteResults: 

192 runconfig: RunConfigResults 

193 name: str 

194 build: str 

195 result_url_pattern: str 

196 format: str 

197 format_version: int | None = None 

198 db_object: TestSuite = field(init=False) 

199 _result_type: type[TestsuiteRunResults] = field(init=False) 

200 

201 def __post_init__(self) -> None: 

202 # Check if the database contains the build, and then fetch the Testsuite associated with it 

203 db_build = Build.objects.get(name=self.build) 

204 self.db_object = TestSuite.objects.get(name=db_build.component) 

205 self._set_result_type() 

206 

207 def _set_result_type(self) -> None: 

208 match self.format: 

209 case "piglit": 

210 if self.format_version != 1: 

211 raise ValueError( 

212 f"The version {self.format_version} of the testsuite result format '{self.format}' " 

213 "is unsupported" 

214 ) 

215 self._result_type = PiglitResult 

216 case "json": 

217 self._result_type = JsonResult 

218 case _: 

219 raise ValueError(f"The testsuite result format '{self.format}' is unsupported") 

220 

221 def read_results(self, *args, **kwargs) -> TestsuiteRunResults: 

222 if self._result_type not in (PiglitResult, JsonResult): 

223 raise ValueError(f"This operation is unsupported for the data format '{self.format}'") 

224 return self._result_type(self, *args, **kwargs) 

225 

226 

227@dataclass 

228class TestSuiteRunDef: 

229 testsuite_build: str 

230 results_format: str 

231 results_format_version_raw: InitVar[int | str] 

232 machine: str 

233 testsuite_run_id_raw: InitVar[int | str] 

234 testsuite_run_path: str 

235 results_format_version: int = field(init=False) 

236 testsuite_run_id: int = field(init=False) 

237 

238 def __post_init__(self, results_format_version_raw: int | str, testsuite_run_id_raw: int | str) -> None: 

239 self.results_format_version = self._to_int("results_format_version_raw", results_format_version_raw) 

240 self.testsuite_run_id = self._to_int("testsuite_run_id_raw", testsuite_run_id_raw) 

241 for cls_field in ("testsuite_build", "results_format", "machine", "testsuite_run_id", "testsuite_run_path"): 

242 if getattr(self, cls_field) is None: 

243 raise ValueError(f"The parameter {cls_field} cannot be None") 

244 

245 @staticmethod 

246 def _to_int(field_name: str, value: int | str) -> int: 

247 try: 

248 return int(value) 

249 except Exception: 

250 raise ValueError(f"The parameter {field_name} '{value}' should be an integer") 

251 

252 

253@dataclass 

254class RunConfigResults: 

255 name: str 

256 url: str | None = None 

257 result_url_pattern: str = "" 

258 environment: str | None = None 

259 builds: list[str] = field(default_factory=list) 

260 tags: list[str] = field(default_factory=list) 

261 temporary: bool = False 

262 run_results: list[TestsuiteRunResults] = field(default_factory=list) 

263 testsuites: dict[str, TestsuiteResults] = field(default_factory=dict, init=False) 

264 

265 def _import_results(self, *args, **kwargs) -> None: 

266 raise NotImplementedError 

267 

268 @cached_property 

269 def tests(self) -> defaultdict[TestSuite, set[str]]: 

270 tests: defaultdict[TestSuite, set[str]] = defaultdict(set) 

271 for run in self.run_results: 

272 tests[run.testsuite.db_object] |= run.tests_set 

273 return tests 

274 

275 @cached_property 

276 def machines(self) -> set[str]: 

277 return {run.machine_name for run in self.run_results} 

278 

279 @cached_property 

280 def text_statuses(self) -> defaultdict[TestSuite, set[str]]: 

281 statuses: defaultdict[TestSuite, set[str]] = defaultdict(set) 

282 for run in self.run_results: 

283 statuses[run.testsuite.db_object] |= run.statuses_set 

284 return statuses 

285 

286 

287class RunConfigResultsFromDir(RunConfigResults): 

288 _RUNCONFIG_SECTION_HEADER: str = "CIRESULTS_RUNCONFIG" 

289 

290 def __init__(self, runconfig_dir: str) -> None: 

291 runconfig_path = os.path.join(runconfig_dir, "runconfig.ini") 

292 self._error_prefix = f"The RunConfig file {runconfig_path} is invalid: " 

293 

294 runconfig_parser: configparser.ConfigParser = self._get_runconfig_parser(runconfig_path) 

295 main_section_proxy: configparser.SectionProxy = runconfig_parser[self._RUNCONFIG_SECTION_HEADER] 

296 name = main_section_proxy.get("name") 

297 if name is None: 

298 self._raise_parser_error("runconfig name unspecified") 

299 super().__init__( 

300 name=cast(str, name), 

301 url=validate_url(main_section_proxy.get("url")), 

302 result_url_pattern=main_section_proxy.get("result_url_pattern", ""), 

303 environment=main_section_proxy.get("environment"), 

304 builds=str_to_list(main_section_proxy.get("builds")), 

305 tags=str_to_list(main_section_proxy.get("tags")), 

306 temporary=main_section_proxy.getboolean("temporary", False), 

307 ) 

308 # Parse the testsuite sections 

309 for section_name in runconfig_parser.sections(): 

310 # Ignore the main section 

311 if section_name == self._RUNCONFIG_SECTION_HEADER: 

312 continue 

313 build: str = runconfig_parser[section_name]["build"] 

314 if build not in self.builds: 

315 self._raise_parser_error( 

316 f"The build '{build}' of the testsuite '{section_name}' is not found " 

317 f"in the list of builds of the runconfig {self.name}" 

318 ) 

319 format: str = runconfig_parser[section_name]["format"] 

320 version: int = runconfig_parser[section_name].getint("version", 1) 

321 result_url_pattern: str = runconfig_parser[section_name].get("result_url_pattern", "") 

322 self.testsuites[section_name] = TestsuiteResults( 

323 self, section_name, build, result_url_pattern, format, version 

324 ) 

325 self._import_results(runconfig_dir) 

326 

327 def _import_results(self, runconfig_dir: str) -> None: 

328 runconfig_dir_path: Path = Path(runconfig_dir) 

329 testsuite_results: TestsuiteResults | None = None 

330 for dirpath, dirnames, _ in runconfig_dir_path.walk(): 

331 relative_path_depth = get_relative_path_depth(runconfig_dir_path, dirpath) 

332 match relative_path_depth: 

333 case 1: # Testsuite results directory level 

334 if (testsuite_results := self.testsuites.get(dirpath.stem)) is None: 

335 dirnames.clear() # Clear directories list = stop Path.walk() from going deeper 

336 logger.info( 

337 "Ignore the testsuite '%s' because it is not listed in the runconfig file", 

338 dirpath.stem, 

339 ) 

340 continue 

341 case 2: # Machine directory level 

342 machine: str = dirpath.stem 

343 continue 

344 case 3: # Run directory level 

345 dirnames.clear() # Clear directories list = stop Path.walk() from going deeper 

346 case _: 

347 continue 

348 try: 

349 testsuite_run_id: int = int(dirpath.stem) 

350 except ValueError: 

351 logger.warning("RunConfigResults: testsuite run ID '%s' should be an integer", dirpath.stem) 

352 continue 

353 testsuite_results = cast(TestsuiteResults, testsuite_results) 

354 try: 

355 self.run_results.append(testsuite_results.read_results(machine, testsuite_run_id, str(dirpath))) 

356 except FileNotFoundError: 

357 pass 

358 except Exception: 

359 logger.exception("") 

360 

361 def _get_runconfig_parser(self, runconfig_path: str) -> configparser.ConfigParser: 

362 runconfig_parser: configparser.ConfigParser = configparser.ConfigParser() 

363 runconfig_parser.read(runconfig_path) 

364 if not runconfig_parser.has_section(self._RUNCONFIG_SECTION_HEADER): 

365 self._raise_parser_error(f"missing the section {self._RUNCONFIG_SECTION_HEADER}") 

366 return runconfig_parser 

367 

368 def _raise_parser_error(self, msg: str) -> None: 

369 raise ValueError(self._error_prefix + msg) 

370 

371 

372class RunConfigResultsFromArgs(RunConfigResults): 

373 def __init__( 

374 self, 

375 name: str, 

376 url: str | None = None, 

377 result_url_pattern: str = "", 

378 environment: str | None = None, 

379 builds: list[str] | None = None, 

380 tags: list[str] | None = None, 

381 temporary: bool = False, 

382 results: list[TestSuiteRunDef] | None = None, 

383 ) -> None: 

384 super().__init__( 

385 name=name, 

386 url=url, 

387 result_url_pattern=result_url_pattern, 

388 environment=environment, 

389 builds=builds or [], 

390 tags=tags or [], 

391 temporary=temporary, 

392 ) 

393 self._import_results(results or []) 

394 

395 def _import_results(self, results: list[TestSuiteRunDef]) -> None: 

396 self._args_testsuites: dict[str, TestsuiteResults] = {} 

397 testsuite_runs: set[tuple[str, int, str, str]] = set() 

398 for result in results: 

399 # Check if the testsuite build is in the list of builds of the runconfig 

400 if result.testsuite_build not in self.builds: 

401 raise ValueError( 

402 f"The build named '{result.testsuite_build}' is not part of the list of builds of the runconfig" 

403 ) 

404 try: 

405 testsuite_result: TestsuiteResults = self._get_or_create_testsuite_results(result) 

406 self._args_testsuites[result.testsuite_build] = testsuite_result 

407 except Build.DoesNotExist: 

408 raise ValueError(f"The build named '{result.testsuite_build}' does not exist") 

409 # Check if the testsuite run has not been added already 

410 testsuite_run_key_values: tuple[str, int, str, str] = ( 

411 testsuite_result.name, 

412 result.testsuite_run_id, 

413 self.name, 

414 result.machine, 

415 ) 

416 if testsuite_run_key_values in testsuite_runs: 

417 raise ValueError( 

418 f"Try to import twice {testsuite_result.name}'s run ID {result.testsuite_run_id} " 

419 f"on the runconfig '{self.name}' for the machine '{result.machine}'" 

420 ) 

421 testsuite_runs.add(testsuite_run_key_values) 

422 # Import the results 

423 try: 

424 self.run_results.append( 

425 testsuite_result.read_results(result.machine, result.testsuite_run_id, result.testsuite_run_path) 

426 ) 

427 except FileNotFoundError: 

428 pass 

429 except Exception: 

430 logger.exception("") 

431 

432 def _get_or_create_testsuite_results(self, result: TestSuiteRunDef) -> TestsuiteResults: 

433 """Get the testsuite associated to the build name, or create it""" 

434 testsuite_result: TestsuiteResults | None = self._args_testsuites.get(result.testsuite_build) 

435 if testsuite_result is None: 

436 testsuite_name = Build.objects.get(name=result.testsuite_build).component.name 

437 # Create the testsuite results object 

438 testsuite_result = TestsuiteResults( 

439 self, 

440 testsuite_name, 

441 result.testsuite_build, 

442 self.result_url_pattern, 

443 result.results_format, 

444 result.results_format_version, 

445 ) 

446 return testsuite_result 

447 

448 

449class ResultsCommitHandler: 

450 def __init__(self, runconfig_results: RunConfigResults): 

451 self._runconfig_results: RunConfigResults = runconfig_results 

452 

453 @transaction.atomic 

454 def commit( 

455 self, 

456 new_machines_public: bool = False, 

457 new_tests_public: bool = False, 

458 new_machines_vetted: bool = False, 

459 new_tests_vetted: bool = False, 

460 new_statuses_vetted: bool = False, 

461 ) -> None: 

462 self._now = timezone.now() 

463 self._now = cast(datetime.datetime, self._now) 

464 self._fetch_builds_and_tags() 

465 self._fetch_or_create_runconfig() 

466 self._verify_duplicate_builds() 

467 self._add_new_builds() 

468 # Abort early if there is nothing else to do 

469 if len(self._runconfig_results.run_results) == 0: 

470 logger.info("No results to add, exiting...") 

471 return 

472 self._fetch_existing_testsuite_runs() 

473 self._create_missing_machines(new_machines_public, new_machines_vetted) 

474 self._create_missing_tests(new_tests_public, new_tests_vetted) 

475 self._create_missing_statuses(new_statuses_vetted) 

476 self._create_testsuite_runs() 

477 self._create_test_results_and_find_failures() 

478 self._fetch_issue_filter_associateds() 

479 self._map_failures() 

480 self._create_new_failures() 

481 # Create the statistics objects, but only if the run was not temporary. 

482 if not self._runconfig_results.temporary: 

483 self._update_statistics_and_corresponding_issues() 

484 if len(self._unknown_failures) > 0: 

485 self._fetch_archived_issue_filter_associateds() 

486 self._match_unknown_failures_to_archived_ifa() 

487 

488 def _fetch_builds_and_tags(self) -> None: 

489 self._builds = self._fetch_object_by_names(Build, self._runconfig_results.builds) 

490 self._tags = self._fetch_object_by_names(RunConfigTag, self._runconfig_results.tags) 

491 

492 def _fetch_or_create_runconfig(self) -> None: 

493 self._runconfig, created = RunConfig.objects.get_or_create( 

494 name=self._runconfig_results.name, 

495 defaults={ 

496 "url": self._runconfig_results.url, 

497 "environment": self._runconfig_results.environment, 

498 "temporary": self._runconfig_results.temporary, 

499 }, 

500 ) 

501 self._runconfig_builds: set[Build] 

502 if created: 

503 # Add runconfig to all the tags 

504 for tag in self._tags.values(): 

505 self._runconfig.tags.add(tag) 

506 # Runconfig builds are empty because we've just created the runconfig 

507 self._runconfig_builds = set() 

508 return 

509 self._runconfig_builds = set(self._runconfig.builds.all()) 

510 

511 def _verify_duplicate_builds(self) -> None: 

512 """Verify that we do not have two builds for the same component""" 

513 components: dict[Component, Build] = {} 

514 for build in set(self._builds.values()) | self._runconfig_builds: 

515 if build.component not in components: 

516 components[build.component] = build 

517 else: 

518 raise ValueError( 

519 f"ERROR: Two builds ({build} and {components[build.component]}) " 

520 f"cannot be from the same component ({build.component})" 

521 ) 

522 

523 def _add_new_builds(self) -> None: 

524 for new_build in set(self._builds.values()) - self._runconfig_builds: 

525 self._runconfig.builds.add(new_build) 

526 

527 def _fetch_existing_testsuite_runs(self) -> None: 

528 """Load all the existing testsuite runs on this runconfig""" 

529 # NOTE: prefetch data that is later used for cheking IssueFilter's coverage 

530 testsuite_runs_query: QuerySet[TestsuiteRun] = TestsuiteRun.objects.filter( 

531 runconfig=self._runconfig 

532 ).prefetch_related("machine__tags", "runconfig__tags", "testsuite") 

533 self._testsuite_runs: defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]] = ( 

534 self._testsuite_runs_to_dict(testsuite_runs_query) 

535 ) 

536 

537 def _create_missing_machines(self, public: bool, vetted: bool) -> None: 

538 self._machines: dict[str, Machine] = self._create_missing( 

539 model=Machine, 

540 model_str="machine(s)", 

541 missing_objs=self._runconfig_results.machines, 

542 key_field="name", 

543 args={"public": public, "vetted_on": self._now if vetted else None}, 

544 ) 

545 

546 def _create_missing_tests(self, public: bool, vetted: bool) -> None: 

547 self._tests: dict[str, dict[str, Test]] = {} 

548 current_tests_ids: list[int] = [] 

549 for testsuite, tests in self._runconfig_results.tests.items(): 

550 self._tests[testsuite.name] = self._create_missing( 

551 model=Test, 

552 model_str=f"test(s) ({testsuite})", 

553 missing_objs=tests, 

554 key_field="name", 

555 args={ 

556 "public": public, 

557 "testsuite": testsuite, 

558 "vetted_on": self._now if vetted else None, 

559 }, 

560 filter={"testsuite": testsuite}, 

561 ) 

562 # If the runconfig is non-temporary, convert the tests 

563 if not self._runconfig_results.temporary: 

564 current_tests_ids.extend([test.id for test in self._tests[testsuite.name].values()]) 

565 if current_tests_ids: 

566 Test.objects.filter(pk__in=current_tests_ids, first_runconfig=None).update(first_runconfig=self._runconfig) 

567 

568 def _create_missing_statuses(self, vetted: bool) -> None: 

569 self._statuses: dict[str, dict[str, TextStatus]] = {} 

570 for testsuite, text_status in self._runconfig_results.text_statuses.items(): 

571 self._statuses[testsuite.name] = self._create_missing( 

572 model=TextStatus, 

573 model_str=f"status(es) ({testsuite})", 

574 missing_objs=text_status, 

575 key_field="name", 

576 args={"testsuite": testsuite, "vetted_on": self._now if vetted else None}, 

577 filter={"testsuite": testsuite}, 

578 ) 

579 

580 def _create_testsuite_runs(self) -> None: 

581 self._new_run_results: list[TestsuiteRunResults] = [] 

582 to_create: list[TestsuiteRun] = [] 

583 for run_result in self._runconfig_results.run_results: 

584 # Ignore the runs we already have 

585 if ( 

586 self._testsuite_runs.get(run_result.machine_name, {}) 

587 .get(run_result.run_id, {}) 

588 .get(run_result.testsuite.name) 

589 ) is not None: 

590 continue 

591 # Create the TestsuiteRun object 

592 new_testsuite_run: TestsuiteRun = TestsuiteRun( 

593 testsuite=run_result.testsuite.db_object, 

594 runconfig=self._runconfig, 

595 machine=self._machines[run_result.machine_name], 

596 run_id=run_result.run_id, 

597 start=run_result.start_time, 

598 duration=run_result.duration, 

599 ) 

600 to_create.append(new_testsuite_run) 

601 self._new_run_results.append(run_result) 

602 if not (to_create_len := len(to_create)): 

603 return 

604 logger.info("Adding %s testsuite runs", to_create_len) 

605 new_testsuite_runs: list[TestsuiteRun] = TestsuiteRun.objects.bulk_create(to_create) 

606 # NOTE: prefetch data that is later used for cheking IssueFilter's coverage 

607 prefetch_related_objects(new_testsuite_runs, "machine__tags", "runconfig__tags", "testsuite") 

608 self._testsuite_runs |= self._testsuite_runs_to_dict(new_testsuite_runs) 

609 

610 def _create_test_results_and_find_failures(self) -> None: 

611 self._test_results: list[TestResult] = [] 

612 self._failures: list[TestResult] = [] 

613 for run in self._new_run_results: 

614 for result in run.test_results: 

615 testsuite = run.testsuite.db_object 

616 test_result = TestResult( 

617 test=self._tests[testsuite.name][result.name], 

618 ts_run=self._testsuite_runs[run.machine_name][run.run_id][run.testsuite.name], 

619 status=self._statuses[testsuite.name][result.status], 

620 start=result.start_time, 

621 duration=result.duration, 

622 command=result.command, 

623 stdout=result.stdout, 

624 stderr=result.stderr, 

625 dmesg=result.dmesg, 

626 url=result.url, 

627 ) 

628 self._test_results.append(test_result) 

629 if test_result.is_failure: 

630 self._failures.append(test_result) 

631 if test_results_len := len(self._test_results): 

632 logger.info("Adding %s test results", test_results_len) 

633 self._test_results = TestResult.objects.bulk_create(self._test_results, batch_size=5000) 

634 

635 def _fetch_issue_filter_associateds(self) -> None: 

636 """Fetch all the associated IssueFilters and their related issues""" 

637 self._ifas: defaultdict[IssueFilter, list[IssueFilterAssociated]] = defaultdict(list) 

638 # NOTE: prefetch data that is later used for cheking IssueFilter's coverage 

639 active_ifas: QuerySet[IssueFilterAssociated] = IssueFilterAssociated.objects_ready_for_matching.filter( 

640 deleted_on=None 

641 ).prefetch_related("filter__tests__testsuite", "filter__statuses__testsuite") 

642 for ifa in active_ifas: 

643 self._ifas[ifa.filter].append(ifa) 

644 

645 def _map_failures(self) -> None: 

646 """ 

647 Get the filter statistics already-existing on this runconfig and lock them for update. 

648 Next, update filter statistics for the issue filter that covers the result. 

649 """ 

650 start: float = time.time() 

651 self._known_failures: list[KnownFailure] = [] 

652 self._unknown_failures: list[UnknownFailure] = [] 

653 self._fetch_and_lock_existing_filter_statistics() 

654 for result in self._test_results: 

655 found: bool = False 

656 for issue_filter in self._ifas: 

657 # Start matching the result to the current filter 

658 if issue_filter.covers(result): 

659 # Get or create a statistics object for the current filter and runconfig 

660 filter_stats: FilterStatsTuple = self._filters_stats_changes.get( 

661 issue_filter, FilterStatsTuple(None, None) 

662 ) 

663 if filter_stats.current is None: 

664 if filter_stats.current is not None: 

665 filter_stats = FilterStatsTuple(filter_stats.current, copy.copy(filter_stats.current)) 

666 self._filters_stats_changes[issue_filter] = filter_stats 

667 else: 

668 filter_stats = FilterStatsTuple( 

669 None, 

670 RunFilterStatistic( 

671 filter=issue_filter, runconfig=self._runconfig, matched_count=0, covered_count=0 

672 ), 

673 ) 

674 self._filters_stats_changes[issue_filter] = filter_stats 

675 filter_stats.new.covered_count += 1 

676 if result not in self._failures: 

677 continue 

678 if issue_filter.matches(result): 

679 filter_stats.new.matched_count += 1 

680 for ifa in self._ifas[issue_filter]: 

681 self._known_failures.append(KnownFailure(result=result, matched_ifa=ifa)) 

682 found = True 

683 if result in self._failures and not found: 

684 self._unknown_failures.append(UnknownFailure(result=result)) 

685 execution_time_ms: float = (time.time() - start) * 1000 

686 logger.info( 

687 "Found %s test failures (%s filters matched, %s failures left unmatched) in %.2f ms", 

688 len(self._failures), 

689 len(self._known_failures), 

690 len(self._unknown_failures), 

691 execution_time_ms, 

692 ) 

693 

694 def _fetch_and_lock_existing_filter_statistics(self) -> None: 

695 """Get the filters statistics already-existing on this runconfig and lock them for update""" 

696 # Stores results as a namedtuple (current, new) 

697 self._filters_stats_changes: dict[IssueFilter, FilterStatsTuple] = {} 

698 if not self._runconfig_results.temporary: 

699 for stat in RunFilterStatistic.objects.select_for_update().filter(runconfig=self._runconfig): 

700 self._filters_stats_changes[stat.filter] = FilterStatsTuple(stat, None) 

701 

702 def _create_new_failures(self) -> None: 

703 self._known_failures = KnownFailure.objects.bulk_create(self._known_failures) 

704 self._unknown_failures = UnknownFailure.objects.bulk_create(self._unknown_failures) 

705 

706 @transaction.atomic 

707 def _update_statistics_and_corresponding_issues(self) -> None: 

708 # Create a transaction savepoint, in case the updating process fails 

709 try: 

710 self._update_statistics() 

711 self._update_issues() 

712 except Exception: 

713 logger.exception("") 

714 

715 def _update_statistics(self) -> None: 

716 """ 

717 Save all the stats (WARNING: some might already exist in the DB, hence update_conflict=True in bulk_create) 

718 """ 

719 self._statistics: list[RunFilterStatistic] = [ 

720 filter_stats.new for filter_stats in self._filters_stats_changes.values() if filter_stats.new is not None 

721 ] 

722 logger.info("Updating the statistics of %s filters", len(self._statistics)) 

723 start: float = time.time() 

724 try: 

725 RunFilterStatistic.objects.bulk_create( 

726 self._statistics, 

727 update_conflicts=True, 

728 unique_fields=("runconfig", "filter"), 

729 update_fields=("covered_count", "matched_count"), 

730 ) 

731 except Exception: 

732 logger.exception("") 

733 execution_time_ms: float = (time.time() - start) * 1000 

734 logger.info("Filter statistics updated in %.2f ms", execution_time_ms) 

735 

736 def _update_issues(self) -> None: 

737 """Get the list of issues that need to be updated because the filters statistics got updated""" 

738 # NOTE: prefetch data that is later used for updating statistics 

739 self._issues: list[Issue] = list( 

740 Issue.objects.select_for_update() 

741 .filter(filters__in=self._filters_stats_changes.keys()) 

742 .prefetch_related("filters") 

743 ) 

744 logger.info("Updating the statistics of %s issues", len(self._issues)) 

745 start: float = time.time() 

746 for issue in self._issues: 

747 self._issue_simple_stats_recomputing(issue, self._runconfig, self._filters_stats_changes) 

748 execution_time_ms: float = (time.time() - start) * 1000 

749 logger.info("Issue statistics updated in %.2f ms", execution_time_ms) 

750 

751 def _fetch_archived_issue_filter_associateds(self) -> None: 

752 """Fetch all archived IFAs that are less than 6 months old""" 

753 self._archived_ifas: defaultdict[IssueFilter, list[IssueFilterAssociated]] = defaultdict(list) 

754 archived_threshold: datetime.datetime = self._now - datetime.timedelta(days=180) 

755 # NOTE: prefetch data that is later used for cheking IssueFilter's matching 

756 self._db_archived_ifas: QuerySet[IssueFilterAssociated] = ( 

757 IssueFilterAssociated.objects_ready_for_matching.exclude( 

758 deleted_on=None, deleted_on__lt=archived_threshold 

759 ).prefetch_related("filter__tests__testsuite", "filter__statuses__testsuite") 

760 ) 

761 for archived_ifa in self._db_archived_ifas: 

762 self._archived_ifas[archived_ifa.filter].append(archived_ifa) 

763 

764 def _match_unknown_failures_to_archived_ifa(self) -> None: 

765 """Try to match the unknown failures with archived IFAs""" 

766 start: float = time.time() 

767 filters_matching: set[IssueFilter] = set() 

768 for failure in self._unknown_failures: 

769 for issue_filter, ifas in self._archived_ifas.items(): 

770 if issue_filter.matches(failure.result): 

771 failure.matched_archived_ifas.update(ifas) 

772 filters_matching.add(issue_filter) 

773 execution_time_ms: float = (time.time() - start) * 1000 

774 logger.info( 

775 "Found %s/%s recently-archived filters matching some unknown failures in %.2f ms", 

776 len(filters_matching), 

777 len(self._db_archived_ifas), 

778 execution_time_ms, 

779 ) 

780 

781 @staticmethod 

782 def _testsuite_runs_to_dict( 

783 testsuite_runs: Iterable[TestsuiteRun], 

784 ) -> defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]]: 

785 testsuite_runs_dict: defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]] = defaultdict( 

786 lambda: defaultdict(dict) 

787 ) 

788 for testsuite_run in testsuite_runs: 

789 testsuite_runs_dict[testsuite_run.machine.name][testsuite_run.run_id][testsuite_run.testsuite.name] = ( 

790 testsuite_run 

791 ) 

792 return testsuite_runs_dict 

793 

794 @staticmethod 

795 def _fetch_object_by_names(model: Model, names: Iterable[str] | str) -> dict[str, Model]: 

796 objects: dict[str, Model] = {} 

797 name: str 

798 try: 

799 if isinstance(names, Iterable): 

800 for name in names: 

801 objects[name] = model.objects.get(name=name) 

802 elif isinstance(names, str): 

803 name = names 

804 objects[name] = model.objects.get(name=name) 

805 except model.DoesNotExist as e: 

806 raise ValueError(f"The object {name} does not exist in the database") from e 

807 return objects 

808 

809 @staticmethod 

810 def _create_missing( 

811 model: Model, 

812 model_str: str, 

813 missing_objs: set, 

814 key_field: str, 

815 args: dict[str, Any], 

816 filter: dict[str, Any] | None = None, 

817 ) -> dict[Any, Model]: 

818 if filter is None: 

819 filter = {} 

820 # Fetch the current list of objects 

821 db_objs: list[Model] = list(model.objects.filter(**filter)) 

822 db_objs_key_fields: set = {getattr(obj, key_field) for obj in db_objs} 

823 

824 # Create the missing objects 

825 to_create: list[Model] = [] 

826 for obj in missing_objs - db_objs_key_fields: 

827 args[key_field] = obj 

828 to_create.append(model(**args)) 

829 if to_create_len := len(to_create): 

830 logger.info("Adding %s missing %s", to_create_len, model_str) 

831 created_objs: list[Model] = model.objects.bulk_create(to_create) 

832 all_needed_objs: dict[Any, Model] = { 

833 getattr(created_obj, key_field): created_obj for created_obj in created_objs 

834 } 

835 else: 

836 all_needed_objs = {} 

837 

838 # Add previously existing objects to the returned dictionary 

839 for db_obj in db_objs: 

840 key = getattr(db_obj, key_field) 

841 if key in missing_objs: 

842 all_needed_objs[key] = db_obj 

843 return all_needed_objs 

844 

845 @staticmethod 

846 def _issue_simple_stats_recomputing( 

847 issue: Issue, runconfig: RunConfig, stats_changes: dict[IssueFilter, FilterStatsTuple] 

848 ) -> None: 

849 """ 

850 Iterate through all the filters associated to this issue and check if a filter's statistic has changed from not 

851 covered/affected to covered/affected. If so, and if the issue was not already covered/affected, update the 

852 statistics by directly doing +1 in the relevant stats field. Since we are only adding results, we cannot be in a 

853 situation where we need to -1 an issue. 

854 

855 WARNING: This function is tailored for the purpose of adding results! 

856 DO NOT USE AS A LIGHTWEIGHT REPLACEMENT FOR Issue.update_statistics() 

857 """ 

858 was_covered: bool = False 

859 has_new_filter_covering: bool = False 

860 was_affected: bool = False 

861 has_new_filter_matched: bool = False 

862 previous: RunFilterStatistic | None 

863 new: RunFilterStatistic | None 

864 for issuefilter in issue.filters.all(): 

865 previous, new = stats_changes.get(issuefilter, FilterStatsTuple(None, None)) 

866 # If we have no previous stats for the filter, just fake empty ones 

867 if previous is None: 

868 previous = RunFilterStatistic(filter=issuefilter, runconfig=runconfig, matched_count=0, covered_count=0) 

869 # Check if the issue was covered/affected for this runconfig before 

870 if previous.covered_count > 0: 

871 was_covered = True 

872 if previous.matched_count > 0: 

873 was_affected = True 

874 # OPTIMIZATION: The issue was already affected, so no changes in 

875 # statistics could come by adding more results 

876 return 

877 if new is not None: 

878 if previous.covered_count == 0 and new.covered_count > 0: 

879 has_new_filter_covering = True 

880 if previous.matched_count == 0 and new.matched_count > 0: 

881 has_new_filter_matched = True 

882 # Update the covered/affected count if necessary 

883 changed: bool = False 

884 if not was_covered and has_new_filter_covering: 

885 issue.runconfigs_covered_count += 1 

886 changed = True 

887 if not was_affected and has_new_filter_matched: 

888 issue.runconfigs_affected_count += 1 

889 issue.last_seen = runconfig.added_on 

890 issue.last_seen_runconfig = runconfig 

891 changed = True 

892 if changed: 

893 issue.save() 

894 

895 @property 

896 @none_if_undefined 

897 def commit_time(self) -> datetime.datetime | None: 

898 return self._now 

899 

900 @property 

901 @none_if_undefined 

902 def builds(self) -> dict[str, Build] | None: 

903 return self._builds 

904 

905 @property 

906 @none_if_undefined 

907 def tags(self) -> dict[str, RunConfigTag] | None: 

908 return self._tags 

909 

910 @property 

911 @none_if_undefined 

912 def runconfig(self) -> RunConfig | None: 

913 return self._runconfig 

914 

915 @property 

916 @none_if_undefined 

917 def testsuite_runs(self) -> defaultdict[str, defaultdict[int, dict[str, TestsuiteRun]]] | None: 

918 return self._testsuite_runs 

919 

920 @property 

921 @none_if_undefined 

922 def machines(self) -> dict[str, Machine] | None: 

923 return self._machines 

924 

925 @property 

926 @none_if_undefined 

927 def tests(self) -> dict[str, dict[str, Test]] | None: 

928 return self._tests 

929 

930 @property 

931 @none_if_undefined 

932 def statuses(self) -> dict[str, dict[str, TextStatus]] | None: 

933 return self._statuses 

934 

935 @property 

936 @none_if_undefined 

937 def test_results(self) -> list[TestResult] | None: 

938 return self._test_results 

939 

940 @property 

941 @none_if_undefined 

942 def failures(self) -> list[TestResult] | None: 

943 return self._failures 

944 

945 @property 

946 @none_if_undefined 

947 def ifas(self) -> defaultdict[IssueFilter, list[IssueFilterAssociated]] | None: 

948 return self._ifas 

949 

950 @property 

951 @none_if_undefined 

952 def known_failures(self) -> list[KnownFailure] | None: 

953 return self._known_failures 

954 

955 @property 

956 @none_if_undefined 

957 def unknown_failures(self) -> list[UnknownFailure] | None: 

958 return self._unknown_failures 

959 

960 @property 

961 @none_if_undefined 

962 def statistics(self) -> list[RunFilterStatistic] | None: 

963 return self._statistics 

964 

965 @property 

966 @none_if_undefined 

967 def issues(self) -> list[Issue] | None: 

968 return self._issues 

969 

970 @property 

971 @none_if_undefined 

972 def archived_ifas(self) -> defaultdict[IssueFilter, list[IssueFilterAssociated]] | None: 

973 return self._archived_ifas