Coverage for CIResults/tests/test_models.py: 100%

939 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 13:11 +0000

1from unittest.mock import patch, MagicMock, PropertyMock 

2from django.test import TestCase 

3from django.utils import timezone 

4from django.core.exceptions import ValidationError 

5from django.contrib.auth import get_user_model 

6 

7from CIResults.models import BugTracker, Bug, BugTrackerSLA, Person, BugTrackerAccount, BugComment 

8from CIResults.models import Test, Machine, RunConfigTag, RunConfig, TestSuite, TextStatus 

9from CIResults.models import TestResult, IssueFilter, IssueFilterAssociated, Issue, RunFilterStatistic 

10from CIResults.models import MachineTag, TestsuiteRun, UnknownFailure, KnownFailure, Component, Build 

11from CIResults.models import Rate, ReplicationScript, script_validator 

12 

13from model_bakery import baker 

14 

15import datetime 

16import pytz 

17 

18 

19class BugTrackerSLATests(TestCase): 

20 def test___str__(self): 

21 tracker = BugTracker(name="BugTracker") 

22 entry = BugTrackerSLA(tracker=tracker, priority="high", SLA=datetime.timedelta(weeks=10, seconds=23)) 

23 self.assertEqual(str(entry), "BugTracker: high -> 70 days, 0:00:23") 

24 

25 

26class PersonTests(TestCase): 

27 def test_full(self): 

28 p = Person(full_name="John Doe", email="john.doe@isp.earth") 

29 self.assertEqual(str(p), "John Doe <john.doe@isp.earth>") 

30 

31 def test_full_name_only_only(self): 

32 p = Person(full_name="John Doe") 

33 self.assertEqual(str(p), "John Doe") 

34 

35 def test_email_only(self): 

36 p = Person(email="john.doe@isp.earth") 

37 self.assertEqual(str(p), "john.doe@isp.earth") 

38 

39 def test_no_information(self): 

40 p = Person() 

41 self.assertEqual(str(p), "(No name or email)") 

42 

43 

44class TestBugTrackerAccount(TestCase): 

45 def test_str(self): 

46 person = Person(full_name="John Doe", email="john.doe@isp.earth") 

47 self.assertEqual(str(BugTrackerAccount(person=person)), str(person)) 

48 

49 

50class BugTrackerTests(TestCase): 

51 def test_str(self): 

52 tracker = BugTracker(name="Freedesktop.org", short_name="fdo", 

53 separator="#", public=True, 

54 url="https://bugs.freedesktop.org/", 

55 bug_base_url="https://bugs.freedesktop.org/show_bug.cgi?id=") 

56 self.assertEqual(str(tracker), "Freedesktop.org") 

57 

58 @patch('CIResults.models.BugTrackerSLA.objects.filter', 

59 return_value=[MagicMock(priority="P1", SLA=datetime.timedelta(seconds=1)), 

60 MagicMock(priority="P2", SLA=datetime.timedelta(seconds=3)), 

61 MagicMock(priority="P3", SLA=datetime.timedelta(seconds=2))]) 

62 def test_SLAs_cached(self, filter_mocked): 

63 tracker = BugTracker(name="Tracker1", public=True) 

64 slas = tracker.SLAs_cached 

65 

66 filter_mocked.assert_called_with(tracker=tracker) 

67 self.assertEqual(slas, {"p1": filter_mocked.return_value[0].SLA, 

68 "p2": filter_mocked.return_value[1].SLA, 

69 "p3": filter_mocked.return_value[2].SLA}) 

70 

71 @patch('CIResults.models.BugTracker.tracker') 

72 def test_poll(self, tracker_mock): 

73 bug = MagicMock() 

74 bug.save = MagicMock() 

75 tracker = BugTracker() 

76 tracker.tracker._get_tracker_time = MagicMock(return_value=timezone.now()) 

77 tracker.poll(bug) 

78 tracker_mock.poll.assert_called_with(bug, False) 

79 bug.save.assert_not_called() 

80 self.assertLess(timezone.now() - bug.polled, datetime.timedelta(seconds=.1)) 

81 

82 @patch('CIResults.bugtrackers.Bugzilla') 

83 def test_tracker__bugzilla(self, bugzilla_mock): 

84 self.assertEqual(BugTracker(tracker_type="bugzilla").tracker, bugzilla_mock.return_value) 

85 

86 @patch('CIResults.bugtrackers.Jira') 

87 def test_tracker__jira(self, jira_mock): 

88 self.assertEqual(BugTracker(tracker_type="jira").tracker, jira_mock.return_value) 

89 

90 @patch('CIResults.bugtrackers.GitLab') 

91 def test_tracker__gitlab(self, gitlab_mock): 

92 self.assertEqual(BugTracker(tracker_type="gitlab").tracker, gitlab_mock.return_value) 

93 

94 @patch('CIResults.bugtrackers.Untracked') 

95 def test_tracker__jira_untracked(self, untracked_mock): 

96 self.assertEqual(BugTracker(tracker_type="jira_untracked").tracker, untracked_mock.return_value) 

97 

98 def test_tracker__invalid_name(self): 

99 tracker = BugTracker(tracker_type="invalid_name") 

100 self.assertRaisesMessage(ValueError, "The bugtracker type 'invalid_name' is unknown", 

101 getattr, tracker, "tracker") 

102 

103 @patch.object(BugTracker, 'tracker', PropertyMock()) 

104 def test_open_statuses(self): 

105 tracker = BugTracker(tracker_type="tracker") 

106 self.assertEqual(tracker.open_statuses, BugTracker.tracker.open_statuses) 

107 

108 @patch('CIResults.models.Bug.objects.filter', return_value=[MagicMock(), MagicMock(), MagicMock()]) 

109 @patch('CIResults.models.BugTracker.poll') 

110 def test_poll_all(self, poll_mocked, bugs_mocked): 

111 tracker = BugTracker(tracker_type="untracked", public=True) 

112 tracker.poll_all() 

113 

114 # Check that filter got called with the right argument 

115 bugs_mocked.assert_called_with(tracker=tracker) 

116 

117 # Check that every bug we set up got polled 

118 self.assertEqual(poll_mocked.call_count, 3) 

119 for i in range(3): 

120 poll_mocked.assert_any_call(bugs_mocked.return_value[i]) 

121 

122 self.assertLess(abs((timezone.now() - tracker.polled).total_seconds()), .1) 

123 

124 @patch('CIResults.models.BugTracker.poll') 

125 def test_poll_all__custom_list(self, poll_mocked): 

126 bugs = [MagicMock(), MagicMock(), MagicMock()] 

127 

128 tracker = BugTracker(tracker_type="untracked", public=True) 

129 tracker.poll_all(bugs=bugs) 

130 

131 # Check that every bug we set up got polled 

132 self.assertEqual(poll_mocked.call_count, 3) 

133 for i in range(3): 

134 poll_mocked.assert_any_call(bugs[i]) 

135 

136 self.assertLess(abs((timezone.now() - tracker.polled).total_seconds()), 1) 

137 

138 @patch('CIResults.models.Bug.objects.filter', return_value=[MagicMock(), MagicMock(), MagicMock()]) 

139 @patch('CIResults.models.BugTracker.poll') 

140 def test_poll_all_interrupt(self, poll_mocked, bugs_mocked): 

141 tracker = BugTracker() 

142 

143 stop_event = MagicMock(is_set=MagicMock(side_effect=[False, False, True])) 

144 tracker.poll_all(stop_event) 

145 

146 # Check that filter got called with the right argument 

147 bugs_mocked.assert_called_with(tracker=tracker) 

148 

149 # Check that every bug we set up got polled 

150 self.assertEqual(poll_mocked.call_count, 2) 

151 for i in range(2): 

152 poll_mocked.assert_any_call(bugs_mocked.return_value[i]) 

153 

154 def test_components_followed_list(self): 

155 tracker = BugTracker(public=True) 

156 self.assertEqual(tracker.components_followed_list, []) 

157 

158 tracker.components_followed = "COMPONENT1,COMPONENT2, COMPONENT3" 

159 self.assertEqual(tracker.components_followed_list, ["COMPONENT1", "COMPONENT2", "COMPONENT3"]) 

160 

161 def test_get_or_create_bugs(self): 

162 tracker = BugTracker.objects.create(name='tracker', tracker_type='jira_untracked', public=True) 

163 Bug.objects.create(tracker=tracker, bug_id='1') 

164 Bug.objects.create(tracker=tracker, bug_id='3') 

165 

166 self.assertEqual(len(Bug.objects.all()), 2) 

167 tracker.get_or_create_bugs(set(['1', '2', '3'])) 

168 self.assertEqual(len(Bug.objects.all()), 3) 

169 

170 @patch('CIResults.models.BugTracker.open_statuses', new_callable=PropertyMock) 

171 def test_is_bug_open(self, open_statuses_mock): 

172 open_statuses_mock.return_value = ["status1", "status3"] 

173 tracker = BugTracker(public=True) 

174 bug_status1 = Bug(tracker=tracker, status="status1") 

175 bug_status2 = Bug(tracker=tracker, status="status2") 

176 bug_status3 = Bug(tracker=tracker, status="status3") 

177 bug_no_status = Bug(tracker=tracker, status=None) 

178 

179 for bug, is_open in [(bug_status1, True), (bug_status2, False), (bug_status3, True), 

180 (bug_no_status, False)]: 

181 self.assertEqual(tracker.is_bug_open(bug), is_open, 

182 "{}: Should be opened? {}".format(bug.status, is_open)) 

183 

184 @patch('CIResults.models.Bug.objects.filter', return_value=set([MagicMock(spec=Bug, bug_id='existing1'), 

185 MagicMock(spec=Bug, bug_id='existing2')])) 

186 def test_open_bugs__without_followed_list(self, bugs_filter_mocked): 

187 tracker = BugTracker.objects.create(name='tracker', tracker_type='jira_untracked', public=True) 

188 

189 self.assertEqual(tracker.open_bugs(), bugs_filter_mocked.return_value) 

190 bugs_filter_mocked.assert_called_with(tracker=tracker, status__in=tracker.open_statuses) 

191 

192 @patch('CIResults.models.BugTracker.get_or_create_bugs', return_value=set([MagicMock(spec=Bug, 

193 bug_id='searched_bug')])) 

194 @patch('CIResults.bugtrackers.Untracked.search_bugs_ids') 

195 @patch('CIResults.models.Bug.objects.filter', return_value=set([MagicMock(spec=Bug, bug_id='existing1'), 

196 MagicMock(spec=Bug, bug_id='existing2')])) 

197 def test_open_bugs__with_followed_list(self, bugs_filter_mocked, search_bugs_ids_mocked, 

198 get_or_create_bugs_mocked): 

199 tracker = BugTracker.objects.create(name='tracker', tracker_type='jira_untracked', public=True, 

200 components_followed="product1,product2") 

201 

202 self.assertEqual(tracker.open_bugs(), bugs_filter_mocked.return_value | get_or_create_bugs_mocked.return_value) 

203 bugs_filter_mocked.assert_called_with(tracker=tracker, status__in=tracker.open_statuses) 

204 search_bugs_ids_mocked.assert_called_with(components=["product1", "product2"], status=tracker.open_statuses) 

205 

206 @patch('CIResults.models.BugTracker.open_bugs') 

207 def test_followed_bugs(self, open_bugs_mock): 

208 tracker1 = BugTracker.objects.create(name='tracker1', tracker_type='jira_untracked', public=True) 

209 tracker2 = BugTracker.objects.create(name='tracker2', tracker_type='jira_untracked', public=True) 

210 

211 # Create the "open bugs" 

212 open_bugs_mock.return_value = set([Bug.objects.create(bug_id='1234', tracker=tracker1), 

213 Bug.objects.create(bug_id='1235', tracker=tracker1)]) 

214 

215 # Create the bugs associated to the issues 

216 bug1 = Bug.objects.create(bug_id='1', tracker=tracker1) 

217 bug2 = Bug.objects.create(bug_id='2', tracker=tracker2) 

218 bug_old_issue = Bug.objects.create(bug_id='OLD_ISSUE', tracker=tracker1) 

219 Bug.objects.create(bug_id='UNREFERENCED', tracker=tracker2) 

220 

221 active = Issue.objects.create() 

222 active.bugs.add(bug1, bug2) 

223 

224 archived = Issue.objects.create(archived_on=timezone.now()) 

225 archived.bugs.add(bug2, bug_old_issue) 

226 

227 # Make sure that only the expected bugs are returned for each tracker 

228 self.assertEqual(tracker1.followed_bugs(), set([bug1]) | open_bugs_mock.return_value) 

229 self.assertEqual(tracker2.followed_bugs(), set([bug2]) | open_bugs_mock.return_value) 

230 

231 @patch('CIResults.models.BugTracker.get_or_create_bugs') 

232 @patch('CIResults.bugtrackers.Untracked.search_bugs_ids') 

233 @patch('CIResults.models.BugTracker.bugs_in_issues') 

234 @patch('CIResults.models.timezone') 

235 def test_updated_bugs(self, tz_mock, bugs_in_issues_mock, search_bugs_mock, get_or_create_mock): 

236 tracker1 = BugTracker.objects.create(name='tracker1', tracker_type='jira_untracked', 

237 public=True, components_followed="FOO") 

238 tracker1.tracker.open_statuses = ["Open"] 

239 tracker1.polled = datetime.datetime.fromtimestamp(0, pytz.utc) 

240 tracker1.tracker._get_tracker_time = MagicMock(return_value=tracker1.polled + datetime.timedelta(seconds=20)) 

241 tz_mock.now.return_value = tracker1.polled + datetime.timedelta(seconds=10) 

242 

243 all_upd_bugs_ids = set(["bug2", "bug3", "bug4"]) 

244 open_bugs_ids = set(["bug1", "bug3"]) 

245 

246 issue_bug1 = Bug.objects.create(bug_id='bug4', status="Closed", tracker=tracker1) 

247 issue_bug2 = Bug.objects.create(bug_id='bug5', status="Closed", tracker=tracker1) 

248 

249 Bug.objects.create(bug_id='bug1', status="Open", tracker=tracker1) 

250 Bug.objects.create(bug_id='bug2', status="Closed", tracker=tracker1) 

251 Bug.objects.create(bug_id='bug3', status="Open", tracker=tracker1) 

252 

253 search_bugs_mock.side_effect = [all_upd_bugs_ids, 

254 open_bugs_ids] 

255 

256 bugs_in_issues_mock.return_value = set([issue_bug1, issue_bug2]) 

257 tracker1.updated_bugs() 

258 self.assertEqual(datetime.datetime.fromtimestamp(0, pytz.utc) + datetime.timedelta(seconds=10), 

259 search_bugs_mock.call_args_list[0][1]['updated_since']) 

260 

261 """ 

262 bug1 - (not returned) open but not updated 

263 bug2 - (not returned) updated but not open 

264 bug3 - (returned) updated and open 

265 bug4 - (returned) not open, but updated and associated to issue 

266 bug5 - (not returned) not open or updated 

267 """ 

268 get_or_create_mock.assert_called_with(set(["bug3", "bug4"])) 

269 

270 @patch('CIResults.models.BugTracker.get_or_create_bugs') 

271 @patch('CIResults.bugtrackers.Untracked.search_bugs_ids') 

272 @patch('CIResults.models.BugTracker.bugs_in_issues') 

273 @patch('CIResults.models.timezone') 

274 def test_unreplicated_bugs(self, tz_mock, bugs_in_issues_mock, search_bugs_mock, get_or_create_mock): 

275 tracker1 = BugTracker.objects.create(name='tracker1', tracker_type='jira_untracked', 

276 public=True, components_followed="FOO") 

277 tracker1.tracker.open_statuses = ["Open"] 

278 

279 issue_bug1 = Bug.objects.create(bug_id='bug1', status="Open", tracker=tracker1) 

280 issue_bug2 = Bug.objects.create(bug_id='bug2', status="Closed", tracker=tracker1) 

281 

282 Bug.objects.create(bug_id='bug3', status="Open", tracker=tracker1, parent=issue_bug1) 

283 Bug.objects.create(bug_id='bug4', status="Closed", tracker=tracker1, parent=issue_bug2) 

284 Bug.objects.create(bug_id='bug5', status="Open", tracker=tracker1) 

285 

286 tracker1.unreplicated_bugs() 

287 get_or_create_mock.assert_called_with(set(["bug5"])) 

288 

289 

290class BugTests(TestCase): 

291 Model = Bug 

292 

293 def setUp(self): 

294 self.tracker = BugTracker(name="Freedesktop.org", short_name="fdo", 

295 separator="#", public=True, 

296 url="https://bugs.freedesktop.org/", 

297 bug_base_url="https://bugs.freedesktop.org/show_bug.cgi?id=") 

298 self.bug = Bug(tracker=self.tracker, bug_id="1234", title="random title", 

299 created=timezone.now() - datetime.timedelta(days=4)) 

300 

301 def test_short_name(self): 

302 self.assertEqual(self.bug.short_name, "fdo#1234") 

303 

304 def test_url(self): 

305 self.assertEqual(self.bug.url, 

306 "https://bugs.freedesktop.org/show_bug.cgi?id=1234") 

307 

308 def test_features_list(self): 

309 self.assertEqual(self.bug.features_list, []) 

310 self.bug.features = "feature1,feature2 , feature3" 

311 self.assertEqual(self.bug.features_list, ["feature1", "feature2", "feature3"]) 

312 

313 def test_platforms_list(self): 

314 self.assertEqual(self.bug.platforms_list, []) 

315 self.bug.platforms = "platform1,platform2 , platform3" 

316 self.assertEqual(self.bug.platforms_list, ["platform1", "platform2", "platform3"]) 

317 

318 def test_tags_list(self): 

319 self.assertEqual(self.bug.tags_list, []) 

320 self.bug.tags = "tag1,tag 2 , tag3" 

321 self.assertEqual(self.bug.tags_list, ["tag1", "tag 2", "tag3"]) 

322 

323 def test_has_new_comments(self): 

324 self.bug.updated = timezone.now() 

325 

326 # comments_polled = None 

327 self.assertTrue(self.bug.has_new_comments) 

328 

329 # comments_polled < updated 

330 self.bug.comments_polled = self.bug.updated - datetime.timedelta(seconds=1) 

331 self.assertTrue(self.bug.has_new_comments) 

332 

333 # comments_polled > updated 

334 self.bug.comments_polled = self.bug.updated + datetime.timedelta(seconds=1) 

335 self.assertFalse(self.bug.has_new_comments) 

336 

337 @patch('CIResults.models.BugComment.objects.filter') 

338 def test_comments_cached(self, filter_mock): 

339 self.assertEqual(self.bug.comments_cached, filter_mock.return_value.prefetch_related()) 

340 filter_mock.assert_called_with(bug=self.bug) 

341 

342 def test_SLA(self): 

343 tracker = BugTracker.objects.create(name="BugTracker", public=True) 

344 sla_high = BugTrackerSLA.objects.create(tracker=tracker, priority="HIGH", SLA=datetime.timedelta(seconds=23)) 

345 sla_low = BugTrackerSLA.objects.create(tracker=tracker, priority="low", SLA=datetime.timedelta(seconds=30)) 

346 

347 self.assertEqual(tracker.SLAs_cached, {"low": sla_low.SLA, "high": sla_high.SLA}) 

348 

349 bug = Bug(tracker=tracker, priority="low") 

350 self.assertEqual(bug.SLA, sla_low.SLA) 

351 

352 bug = Bug(tracker=tracker, priority="LOW") 

353 self.assertEqual(bug.SLA, sla_low.SLA) 

354 

355 bug = Bug(tracker=tracker, priority="invalid") 

356 self.assertEqual(bug.SLA, datetime.timedelta.max) 

357 

358 def test_SLA_deadline__triage_needed(self): 

359 # Check that if no developer has updated the bug, our deadline is set to the tracker's first_response_SLA 

360 self.tracker.save() 

361 self.bug.save() 

362 self.tracker.first_response_SLA = datetime.timedelta(days=2.1) 

363 self.assertEqual(self.bug.SLA_deadline, self.bug.created + self.tracker.first_response_SLA) 

364 

365 def test_SLA_deadline__normal_SLA(self): 

366 # Check that when we have a developer comment, we follow the SLA 

367 self.bug.SLA = datetime.timedelta(days=1) 

368 self.bug.last_updated_by_developer = timezone.now() 

369 self.assertAlmostEqual(self.bug.SLA_deadline.timestamp(), 

370 (self.bug.last_updated_by_developer + self.bug.SLA).timestamp(), 

371 places=1) 

372 

373 def test_SLA_deadline__infinite_SLA(self): 

374 # In the event where the SLA is infine, verify that we always set the deadline a year in advance 

375 self.bug.SLA = datetime.timedelta.max 

376 self.bug.last_updated_by_developer = timezone.now() - datetime.timedelta(days=30) 

377 self.assertAlmostEqual(self.bug.SLA_deadline.timestamp(), 

378 (timezone.now() + datetime.timedelta(days=365, seconds=1)).timestamp(), 

379 places=1) 

380 

381 def test_SLA_remaining_time__one_day_left(self): 

382 self.bug.SLA_deadline = timezone.now() - datetime.timedelta(days=1) 

383 self.assertAlmostEqual(self.bug.SLA_remaining_time.total_seconds(), 

384 datetime.timedelta(days=-1).total_seconds(), places=1) 

385 

386 def test_SLA_remaining_time__one_day_over(self): 

387 self.bug.SLA_deadline = timezone.now() + datetime.timedelta(days=1) 

388 self.assertLessEqual(abs(self.bug.SLA_remaining_time.total_seconds() - 

389 datetime.timedelta(days=1).total_seconds()), 1) 

390 

391 def test_SLA_remaining_str__one_day_over(self): 

392 self.bug.SLA_remaining_time = datetime.timedelta(days=-1) 

393 exp = "1 day, 0:00:00 ago" 

394 self.assertEqual(exp, self.bug.SLA_remaining_str) 

395 

396 def test_SLA_remaining_str__one_day_left(self): 

397 self.bug.SLA_remaining_time = datetime.timedelta(days=1) 

398 exp = "in 1 day, 0:00:00" 

399 self.assertEqual(exp, self.bug.SLA_remaining_str) 

400 

401 def test_effective_priority(self): 

402 self.bug.SLA_remaining_time = datetime.timedelta(days=3) 

403 self.bug.SLA = datetime.timedelta(hours=2.5) 

404 self.assertAlmostEqual(self.bug.effective_priority, -self.bug.SLA_remaining_time / self.bug.SLA) 

405 

406 def test_is_being_updated__never_flagged(self): 

407 self.assertFalse(self.bug.is_being_updated) 

408 

409 def test_is_being_updated__not_expired(self): 

410 self.bug.flagged_as_update_pending_on = timezone.now() 

411 self.assertTrue(self.bug.is_being_updated) 

412 

413 def test_is_being_updated__expired(self): 

414 self.bug.flagged_as_update_pending_on = timezone.now() - self.bug.UPDATE_PENDING_TIMEOUT 

415 self.assertFalse(self.bug.is_being_updated) 

416 

417 def test_update_pending_expires_in__never_flagged(self): 

418 self.assertEqual(self.bug.update_pending_expires_in, None) 

419 

420 @patch('django.utils.timezone.now', 

421 return_value=datetime.datetime.strptime('2019-01-01T00:00:05', "%Y-%m-%dT%H:%M:%S")) 

422 def test_update_pending_expires_in__not_expired(self, now_mocked): 

423 self.bug.flagged_as_update_pending_on = timezone.now() 

424 self.assertEqual(self.bug.update_pending_expires_in, self.bug.UPDATE_PENDING_TIMEOUT) 

425 

426 @patch('django.utils.timezone.now', 

427 return_value=datetime.datetime.strptime('2019-01-01T00:00:05', "%Y-%m-%dT%H:%M:%S")) 

428 def test_update_pending_expires_in__expired(self, now_mocked): 

429 self.bug.flagged_as_update_pending_on = timezone.now() - 2 * self.bug.UPDATE_PENDING_TIMEOUT 

430 self.assertEqual(self.bug.update_pending_expires_in, -self.bug.UPDATE_PENDING_TIMEOUT) 

431 

432 @patch('CIResults.models.BugTracker.poll') 

433 def test_poll(self, poll_mock): 

434 self.bug.poll() 

435 poll_mock.assert_called_with(self.bug, False) 

436 

437 def test_create(self): 

438 tracker = BugTracker.objects.create(name="Tracker2", tracker_type="jira", url="http://foo", 

439 project="TEST2", public=True) 

440 bug = Bug(tracker=tracker, title="random title") 

441 bug.tracker.tracker.create_bug = MagicMock(return_value=1) 

442 bug.create() 

443 self.assertEqual(bug.bug_id, 1) 

444 

445 def test_create_error(self): 

446 tracker = BugTracker.objects.create(name="Tracker2", tracker_type="jira", url="http://foo", 

447 project="TEST2", public=True) 

448 bug = Bug(tracker=tracker, title="random title") 

449 bug.tracker.tracker.create_bug = MagicMock(side_effect=ValueError) 

450 bug.create() 

451 self.assertNotEqual(bug.bug_id, 1) 

452 

453 def test_save_with_dict_in_custom_field(self): 

454 tracker = BugTracker.objects.create(name="BugTracker", public=True) 

455 with self.assertRaisesMessage(ValueError, 

456 'Values stored in custom_fields cannot be tuples, lists, dictionaries'): 

457 Bug.objects.create(tracker=tracker, custom_fields={"field": {"toto": "gaga"}}) 

458 

459 def test_save_with_list_in_custom_field(self): 

460 tracker = BugTracker.objects.create(name="BugTracker", public=True) 

461 with self.assertRaisesMessage(ValueError, 

462 'Values stored in custom_fields cannot be tuples, lists, dictionaries'): 

463 Bug.objects.create(tracker=tracker, custom_fields={"field": [0, 1, 2, 3]}) 

464 

465 def test_save_with_tuples_in_custom_field(self): 

466 tracker = BugTracker.objects.create(name="BugTracker", public=True) 

467 with self.assertRaisesMessage(ValueError, 

468 'Values stored in custom_fields cannot be tuples, lists, dictionaries'): 

469 Bug.objects.create(tracker=tracker, custom_fields={"field": (0, 1, 2, 3)}) 

470 

471 def test_update_from_dict(self): 

472 upd_dict = {'severity': 'High', 'priority': 'Medium', 'non-existant': 42, 

473 'id': 42, 'bug_id': 42, 'tracker_id': 42, 'tracker': 42, 

474 'parent_id': 42, 'parent': 42} 

475 self.bug.update_from_dict(upd_dict) 

476 

477 self.assertEqual(self.bug.severity, 'High') 

478 self.assertEqual(self.bug.priority, 'Medium') 

479 self.assertNotIn(42, self.bug.__dict__.values()) 

480 

481 def test_str(self): 

482 self.assertEqual(str(self.bug), "fdo#1234 - random title") 

483 

484 

485class TestReplicationScript(TestCase): 

486 def test_str(self): 

487 tracker = BugTracker.objects.create(name="Freedesktop.org", short_name="fdo", separator="#", public=False) 

488 tracker2 = BugTracker.objects.create(name="JIRA", short_name="jira", separator="#", public=False) 

489 rep_script = ReplicationScript.objects.create(name="My Script", source_tracker=tracker, 

490 destination_tracker=tracker2) 

491 self.assertEqual(str(rep_script), "<replication script 'My Script'>") 

492 

493 def test_script_validator(self): 

494 val_script = "def foo(): pass" 

495 res = script_validator(val_script) 

496 self.assertEqual(res, val_script) 

497 

498 def test_script_validator_error(self): 

499 inval_script = "def foo(" 

500 with self.assertRaises(ValidationError): 

501 script_validator(inval_script) 

502 

503 

504class TestBugComment(TestCase): 

505 def test_str(self): 

506 account = BugTrackerAccount(person=Person(full_name="John Doe", email="john.doe@isp.earth")) 

507 

508 tracker = BugTracker(name="Freedesktop.org", short_name="fdo", separator="#") 

509 bug = Bug(tracker=tracker, bug_id="1234", title="random title", 

510 created=timezone.now() - datetime.timedelta(days=4)) 

511 

512 comment = BugComment(bug=bug, account=account) 

513 self.assertEqual(str(comment), "{}'s comment by {}".format(bug, account)) 

514 

515 

516class TestBuild(TestCase): 

517 def setUp(self): 

518 self.component = Component.objects.create(name='component', public=True) 

519 

520 def test_url(self): 

521 self.assertEqual(Build(component=self.component, name='build').url, 

522 '') 

523 self.assertEqual(Build(component=self.component, name='build', version='version').url, 

524 'version') 

525 self.assertEqual(Build(component=self.component, name='build', repo='git://repo.com/repo', 

526 version='version').url, 

527 'version @ git://repo.com/repo') 

528 self.assertEqual(Build(component=self.component, name='build', upstream_url='https://repo.com/commit/version', 

529 repo='git://repo.com/repo', version='version').url, 

530 'https://repo.com/commit/version') 

531 

532 

533class VettableObjectMixin: 

534 def setUpVettableObject(self, vettable_object): 

535 self.vettable_object = vettable_object 

536 

537 def test_vet(self): 

538 self.assertFalse(self.vettable_object.vetted) 

539 self.vettable_object.vet() 

540 self.assertTrue(self.vettable_object.vetted) 

541 

542 self.assertRaisesMessage(ValueError, 'The object is already vetted', 

543 self.vettable_object.vet) 

544 

545 def test_suppress(self): 

546 self.vettable_object.vetted_on = timezone.now() 

547 

548 self.assertTrue(self.vettable_object.vetted) 

549 self.vettable_object.suppress() 

550 self.assertFalse(self.vettable_object.vetted) 

551 

552 self.assertRaisesMessage(ValueError, 'The object is already suppressed', 

553 self.vettable_object.suppress) 

554 

555 

556class TestTests(TestCase, VettableObjectMixin): 

557 def setUp(self): 

558 self.ts = TestSuite.objects.create(name="testsuite", description="nothing", public=True) 

559 self.test = Test.objects.create(testsuite=self.ts, name="test", public=True) 

560 

561 self.setUpVettableObject(self.test) 

562 

563 def test__str__(self): 

564 self.assertEqual(str(self.test), "{}: {}".format(self.ts.name, self.test.name)) 

565 

566 @patch('CIResults.models.IssueFilterAssociated.objects.filter') 

567 def test_in_active_ifas(self, mock_filter): 

568 self.test.in_active_ifas 

569 mock_filter.assert_called_with(deleted_on=None, filter__tests__in=[self.test]) 

570 

571 @patch.object(Test, 'in_active_ifas', [MagicMock(), MagicMock()]) 

572 def test_rename_public_test_to_existing_private_test(self): 

573 self.test.vetted_on = timezone.now() 

574 self.test.first_runconfig = RunConfig(name="test_runcfg") 

575 

576 new_test = Test.objects.create(testsuite=self.ts, name="test2", public=False, vetted_on=timezone.now(), 

577 first_runconfig=RunConfig.objects.create(name="test2_runcfg", temporary=False)) 

578 

579 self.test.rename(new_test.name) 

580 

581 # Fetch again the new test and check it got updated 

582 new_test = Test.objects.get(name="test2") 

583 self.assertTrue(new_test.public) 

584 self.assertEqual(new_test.testsuite, self.test.testsuite) 

585 self.assertEqual(new_test.first_runconfig.name, "test2_runcfg") # This field should not have changed 

586 self.assertEqual(new_test.vetted_on, self.test.vetted_on) 

587 

588 self.assertEqual(len(self.test.in_active_ifas), 2) 

589 for ifa in self.test.in_active_ifas: 

590 ifa.filter.tests.add.assert_called_with(new_test) 

591 

592 @patch.object(Test, 'in_active_ifas', [MagicMock(), MagicMock()]) 

593 def test_rename_test_to_new_bug(self): 

594 self.test.public = False 

595 self.test.vetted_on = timezone.now() 

596 self.test.first_runconfig = RunConfig(name="test_runcfg") 

597 

598 self.test.rename('test2') 

599 

600 # Fetchthe new test and check the fields got copied 

601 new_test = Test.objects.get(name="test2") 

602 self.assertFalse(new_test.public) 

603 self.assertEqual(new_test.testsuite, self.ts) 

604 self.assertEqual(new_test.first_runconfig, None) 

605 self.assertEqual(new_test.vetted_on, self.test.vetted_on) 

606 

607 self.assertEqual(len(self.test.in_active_ifas), 2) 

608 for ifa in self.test.in_active_ifas: 

609 ifa.filter.tests.add.assert_called_with(new_test) 

610 

611 

612class MachineTagTests(TestCase): 

613 def test_machines(self): 

614 tag = MachineTag.objects.create(name='tag', public=True) 

615 machine1 = Machine.objects.create(name='machine1', public=True) 

616 machine1.tags.add(tag) 

617 

618 machine2 = Machine.objects.create(name='machine2', public=True) 

619 machine2.tags.add(tag) 

620 

621 Machine.objects.create(name='machine3', public=True) 

622 

623 self.assertEqual(tag.machines, [machine1, machine2]) 

624 

625 def test_str(self): 

626 self.assertEqual(str(MachineTag(name='tag')), "tag") 

627 

628 

629class MachineTests(TestCase, VettableObjectMixin): 

630 def setUp(self): 

631 self.machine = Machine.objects.create(name="machine", public=True) 

632 self.setUpVettableObject(self.machine) 

633 

634 @patch('CIResults.models.Machine.tags') 

635 def test_tags_cached(self, tags_cached_mocked): 

636 self.assertEqual(self.machine.tags_cached, tags_cached_mocked.all.return_value) 

637 

638 def test__str__(self): 

639 self.assertEqual(str(self.machine), self.machine.name) 

640 

641 

642class RunConfigTests(TestCase): 

643 def setUp(self): 

644 self.testsuite = TestSuite.objects.create(name="testsuite", public=True) 

645 self.s_pass = TextStatus.objects.create(name="pass", testsuite=self.testsuite) 

646 self.s_fail = TextStatus.objects.create(name="fail", testsuite=self.testsuite) 

647 self.s_broken = TextStatus.objects.create(name="broken", testsuite=self.testsuite) 

648 self.testsuite.acceptable_statuses.add(self.s_pass) 

649 

650 self.runconfig = RunConfig.objects.create(name="run", temporary=True) 

651 

652 def test_public_no_tags(self): 

653 self.assertTrue(self.runconfig.public) 

654 

655 def test_public_all_public(self): 

656 public_tag1 = RunConfigTag.objects.create(name="public_tag1", public=True) 

657 public_tag2 = RunConfigTag.objects.create(name="public_tag2", public=True) 

658 

659 self.runconfig.tags.add(public_tag1) 

660 self.runconfig.tags.add(public_tag2) 

661 self.assertTrue(self.runconfig.public) 

662 

663 def test_public_when_one_tag_is_private(self): 

664 public_tag1 = RunConfigTag.objects.create(name="public_tag1", public=True) 

665 public_tag2 = RunConfigTag.objects.create(name="public_tag2", public=True) 

666 private_tag = RunConfigTag.objects.create(name="private_tag", public=False) 

667 

668 self.runconfig.tags.add(public_tag1) 

669 self.runconfig.tags.add(public_tag2) 

670 self.runconfig.tags.add(private_tag) 

671 self.assertFalse(self.runconfig.public) 

672 

673 def test_update_statistics(self): 

674 self.machine = Machine.objects.create(name="machine", public=True) 

675 self.ts_run = TestsuiteRun.objects.create(testsuite=self.testsuite, runconfig=self.runconfig, 

676 machine=self.machine, run_id=0, 

677 start=timezone.now(), duration=datetime.timedelta(hours=1)) 

678 self.tests = [] 

679 self.testresults = [] 

680 for i in range(4): 

681 self.tests.append(Test.objects.create(name="test{}".format(i), 

682 testsuite=self.testsuite, 

683 public=True)) 

684 

685 status = self.s_pass if i < 2 else self.s_fail 

686 self.testresults.append(TestResult.objects.create(test=self.tests[i], 

687 ts_run=self.ts_run, 

688 status=status, 

689 start=timezone.now(), 

690 duration=datetime.timedelta(seconds=3))) 

691 

692 self.issue = Issue.objects.create(description="Issue", filer="me@me.de") 

693 

694 # Create a filter that should not cover anything, and other that cover 

695 # and match a different count 

696 self.filters = [] 

697 f = IssueFilter.objects.create(description="Covers nothing") 

698 f.statuses.add(self.s_broken) 

699 for i in range(len(self.tests)): 

700 f = IssueFilter.objects.create(description="Filter{}".format(i)) 

701 f.tests.add(self.tests[i]) 

702 if i + 1 < len(self.tests): 

703 f.tests.add(self.tests[i + 1]) 

704 IssueFilterAssociated.objects.create(filter=f, issue=self.issue) 

705 

706 # Try computing statistics as a temporary run first 

707 stats = self.runconfig.update_statistics() 

708 self.assertEqual(len(stats), 0) 

709 

710 # Now check if the stastistics match with a non-temporary run 

711 self.runconfig.temporary = False 

712 stats = self.runconfig.update_statistics() 

713 stats.sort(key=lambda x: x.filter.id) 

714 

715 expected_results = [(0, 2), (1, 2), (2, 2), (1, 1)] 

716 for i, stat in enumerate(stats): 

717 self.assertEqual(stat.matched_count, expected_results[i][0]) 

718 self.assertEqual(stat.covered_count, expected_results[i][1]) 

719 

720 # TODO: Test runcfg_history and runcfg_history_offset 

721 

722 

723class TestSuiteTests(TestCase): 

724 def setUp(self): 

725 self.testsuite = TestSuite(name="testsuite1", public=True) 

726 self.testsuite.save() 

727 

728 def test_str(self): 

729 self.assertEqual(str(self.testsuite), "testsuite1") 

730 

731 def test_is_failure(self): 

732 statuses = [] 

733 for i in range(4): 

734 status = TextStatus(name="status{}".format(i), 

735 testsuite=self.testsuite) 

736 status.save() 

737 statuses.append(status) 

738 

739 self.testsuite.acceptable_statuses.add(statuses[2]) 

740 self.testsuite.acceptable_statuses.add(statuses[3]) 

741 

742 for i in range(0, 2): 

743 self.assertTrue(self.testsuite.is_failure(statuses[i])) 

744 for i in range(2, 4): 

745 self.assertFalse(self.testsuite.is_failure(statuses[i])) 

746 

747 

748class IssueTests(TestCase): 

749 Model = Issue 

750 

751 def setUp(self): 

752 self.user = get_user_model().objects.create(username='blabla') 

753 

754 created_on = timezone.now() - datetime.timedelta(seconds=1) 

755 self.issue = Issue.objects.create(description="blabla", 

756 filer="test@test.de", 

757 added_on=timezone.now()) 

758 

759 self.filters = [] 

760 self.filtersAssoc = [] 

761 for i in range(4): 

762 # Create the a filter and add it to the issue 

763 filter = IssueFilter.objects.create(description="Filter {}".format(i), 

764 added_on=created_on) 

765 self.filters.append(filter) 

766 

767 # Create the association between the filter and the issue 

768 # WARNING: if the index is 2, close it immediately 

769 deleted_on = None if i != 2 else timezone.now() 

770 assoc = IssueFilterAssociated.objects.create(filter=filter, issue=self.issue, 

771 added_on=created_on, added_by=self.user, 

772 deleted_on=deleted_on) 

773 self.filtersAssoc.append(assoc) 

774 

775 # Create multiple runconfigs 

776 self.runconfigs = [] 

777 for i in range(5): 

778 r = RunConfig.objects.create(name="Runconfig {}".format(i), 

779 temporary=False, 

780 added_on=created_on + (i - 2) * datetime.timedelta(seconds=1)) 

781 self.runconfigs.append(r) 

782 

783 for filter in self.filters: 

784 RunFilterStatistic.objects.create(runconfig=r, filter=filter, 

785 covered_count=0 if i < 2 else 10, 

786 matched_count=0 if i < 3 else 3) 

787 

788 self.issue.update_statistics() 

789 

790 def test_active_filters(self): 

791 # Check that all the current filters are here (excluding the filter2 

792 # because it is got deleted already 

793 filters = set([self.filtersAssoc[0], self.filtersAssoc[1], self.filtersAssoc[3]]) 

794 self.assertEqual(filters, set(self.issue.active_filters)) 

795 

796 # Now check that archiving does not change the result 

797 self.issue.archive(self.user) 

798 self.assertEqual(filters, set(self.issue.active_filters)) 

799 

800 def test_runconfigs_covered(self): 

801 runconfigs = set([self.runconfigs[2], self.runconfigs[3], self.runconfigs[4]]) 

802 self.assertEqual(runconfigs, self.issue.runconfigs_covered) 

803 

804 # Now check that archiving does not change the result 

805 self.issue.archive(self.user) 

806 self.assertEqual(runconfigs, self.issue.runconfigs_covered) 

807 

808 def test_runconfigs_affected(self): 

809 runconfigs = set([self.runconfigs[3], self.runconfigs[4]]) 

810 self.assertEqual(runconfigs, self.issue.runconfigs_affected) 

811 

812 # Now check that archiving does not change the result 

813 self.issue.archive(self.user) 

814 self.assertEqual(runconfigs, self.issue.runconfigs_affected) 

815 

816 def test_last_seen(self): 

817 self.assertEqual(self.issue.last_seen, self.runconfigs[-1].added_on) 

818 

819 def test_failure_rate(self): 

820 failure_rate = self.issue.failure_rate 

821 self.assertAlmostEqual(2 / 3, failure_rate.rate) 

822 self.assertEqual("2 / 3 runs (66.7%)", str(failure_rate)) 

823 

824 def test_matches(self): 

825 # Intercept the calls to CIResults.models.IssueFilter.matches 

826 patcher_api_call = patch('CIResults.models.IssueFilter.matches') 

827 mock_matches = patcher_api_call.start() 

828 mock_matches.return_value = False 

829 self.addCleanup(patcher_api_call.stop) 

830 

831 self.issue.matches(None) 

832 self.assertEqual(mock_matches.call_count, 3) 

833 

834 # Now archive the issue, and check matches returns False without 

835 # calling $filter.matches() 

836 mock_matches.reset_mock() 

837 self.issue.archive(self.user) 

838 self.assertFalse(self.issue.matches(None)) 

839 self.assertEqual(mock_matches.call_count, 0) 

840 

841 def __count_active_filters__(self): 

842 count = 0 

843 for e in IssueFilterAssociated.objects.filter(issue=self.issue): 

844 if e.deleted_on is None: 

845 count += 1 

846 else: 

847 self.assertLess(timezone.now() - e.deleted_on, 

848 datetime.timedelta(seconds=10), 

849 "The deleted_on time is incorrect") 

850 self.assertEqual(e.added_by, self.user) 

851 return count 

852 

853 def __check_comment_posted(self, comment_on_all_bugs_mock, substring): 

854 comment_on_all_bugs_mock.assert_called_once() 

855 args, kwargs = comment_on_all_bugs_mock.call_args_list[0] 

856 self.assertIn(substring, args[0]) 

857 

858 @patch('CIResults.models.Issue.comment_on_all_bugs') 

859 def test_archive(self, comment_on_all_bugs_mock): 

860 # Check the default state 

861 self.assertFalse(self.issue.archived) 

862 self.assertEqual(self.issue.archived_on, None) 

863 self.assertEqual(self.__count_active_filters__(), 3) 

864 

865 # Archive the issue 

866 self.issue.archive(self.user) 

867 

868 # Check that the filters' association has been updated 

869 self.assertEqual(self.__count_active_filters__(), 0) 

870 

871 # Check that archived_on has been updated 

872 self.assertTrue(self.issue.archived) 

873 self.assertNotEqual(self.issue.archived_on, None) 

874 self.assertLess(timezone.now() - self.issue.archived_on, 

875 datetime.timedelta(seconds=1), 

876 "The archived_on time is incorrect") 

877 self.assertEqual(self.issue.archived_by, self.user) 

878 

879 # Check that we posted a comment on the bugs associated 

880 self.__check_comment_posted(comment_on_all_bugs_mock, "archived") 

881 

882 # Check that archiving the issue again generates an error 

883 self.assertRaisesMessage(ValueError, "The issue is already archived", 

884 self.issue.archive, self.user) 

885 

886 @patch('CIResults.models.Issue.comment_on_all_bugs') 

887 def test_restore(self, comment_on_all_bugs_mock): 

888 # Archive the issue 

889 # TODO: Load a fixture with an archived issue and an unknown failure 

890 self.issue.archived_on = timezone.now() 

891 

892 # Restore the issue before checking the restoration process 

893 self.issue.restore() 

894 

895 # Check that the filters' association has been updated 

896 self.assertEqual(self.__count_active_filters__(), 3) 

897 

898 # Check that archived_on has been updated 

899 self.assertFalse(self.issue.archived) 

900 self.assertEqual(self.issue.archived_on, None) 

901 self.assertEqual(self.issue.archived_by, None) 

902 

903 # TODO: Make sure the unknown failure became a known failure, associated 

904 # to this issue. To be done after migrating to the fixture 

905 

906 # Check that we posted a comment on the bugs associated 

907 self.__check_comment_posted(comment_on_all_bugs_mock, "restored") 

908 

909 # Check that restoring the issue again generates an error 

910 self.assertRaisesMessage(ValueError, "The issue is not currently archived", 

911 self.issue.restore) 

912 

913 def test_set_bugs(self): 

914 # Create some bugs 

915 bugs = [] 

916 tracker = BugTracker.objects.create(name="Tracker", tracker_type="jira_untracked", public=True) 

917 for i in range(5): 

918 bugs.append(Bug(tracker=tracker, bug_id=str(i), title="bug {}".format(i))) 

919 

920 # Add some bugs 

921 self.issue.set_bugs(bugs[2:4]) 

922 self.assertEqual(set(self.issue.bugs_cached), set(bugs[2:4])) 

923 

924 # Now try to update the bugs 

925 self.issue.set_bugs(bugs[0:3]) 

926 self.assertEqual(set(self.issue.bugs_cached), set(bugs[0:3])) 

927 

928 # Archive the issue, and see if updating the bug generates an assert 

929 self.issue.archive(self.user) 

930 self.assertRaisesMessage(ValueError, "The issue is archived, and thus read-only", 

931 self.issue.set_bugs, []) 

932 

933 def test___filter_add__(self): 

934 filter = baker.make(IssueFilter) 

935 self.issue.__filter_add__(filter, self.user) 

936 self.assertEqual( 

937 IssueFilterAssociated.objects.filter(filter=filter, issue=self.issue, added_by=self.user).count(), 1 

938 ) 

939 

940 @patch("CIResults.models.timezone.now") 

941 def test__assign_to_known_failures(self, timezone_now_mock): 

942 date_reported = timezone.make_aware(datetime.datetime(2024, 1, 1), timezone.get_default_timezone()) 

943 timezone_now_mock.return_value = date_reported 

944 

945 unknown_failures = baker.make(UnknownFailure, _quantity=3) 

946 ifa = baker.make(IssueFilterAssociated) 

947 

948 date_now = timezone.make_aware(datetime.datetime(2024, 1, 2), timezone.get_default_timezone()) 

949 timezone_now_mock.return_value = date_now 

950 known_failures = self.issue._assign_to_known_failures(unknown_failures, ifa) 

951 

952 self.assertEqual(len(known_failures), 3) 

953 self.assertEqual(UnknownFailure.objects.count(), 0) 

954 for failure in known_failures: 

955 self.assertEqual(failure.manually_associated_on, date_now) 

956 self.assertEqual(failure.filing_delay, date_now - date_reported) 

957 

958 def test_replace_filter(self): 

959 old_filter = IssueFilter.objects.create(description="old filter") 

960 new_filter = IssueFilter.objects.create(description="new filter") 

961 

962 # Add the old filter once before deleting it 

963 self.issue.set_filters([old_filter], self.user) 

964 self.issue.set_filters([], self.user) 

965 

966 self.assertEqual(IssueFilterAssociated.objects.exclude(deleted_on=None).filter(filter=old_filter).count(), 1) 

967 

968 # Re-add the old filter, then replace it with the new one 

969 self.issue.set_filters([old_filter], self.user) 

970 self.issue.replace_filter(old_filter, new_filter, self.user) 

971 

972 self.assertEqual(IssueFilterAssociated.objects.exclude(deleted_on=None).filter(filter=old_filter).count(), 2) 

973 self.assertEqual(IssueFilterAssociated.objects.filter(deleted_on=None, filter=new_filter).count(), 1) 

974 

975 # Archive the issue, and see if replacing the filter generates an assert 

976 self.issue.archive(self.user) 

977 self.assertRaisesMessage(ValueError, "The issue is archived, and thus read-only", 

978 self.issue.replace_filter, new_filter, old_filter, self.user) 

979 

980 def test_set_filters(self): 

981 # Check the current amount of filters 

982 self.assertEqual(set([e.filter for e in self.issue.active_filters]), 

983 set([self.filters[0], self.filters[1], self.filters[3]])) 

984 self.assertEqual(set([e.filter for e in self.issue.all_filters]), set(self.filters)) 

985 

986 # Now try to update the filters 

987 self.issue.set_filters(self.filters[0:3], self.user) 

988 self.assertEqual(set([e.filter for e in self.issue.active_filters]), set(self.filters[0:3])) 

989 self.assertEqual(set([e.filter for e in self.issue.all_filters]), set(self.filters)) 

990 

991 # Check that the deleted_on field has been updated for the filter 3 

992 expected_len = [1, 1, 2, 1] 

993 expected_deleted_none = [True, True, False, False] 

994 for i in range(len(self.filters)): 

995 db_assocs = IssueFilterAssociated.objects.filter(filter=self.filters[i], issue=self.issue) 

996 self.assertEqual(len(db_assocs), expected_len[i], i) 

997 if len(db_assocs) > 0: 

998 self.assertEqual(db_assocs[0].deleted_on is None, expected_deleted_none[i], i) 

999 

1000 # Archive the issue, and see if updating the filter generates an assert 

1001 self.issue.archive(self.user) 

1002 self.assertRaisesMessage(ValueError, "The issue is archived, and thus read-only", 

1003 self.issue.set_filters, [], self.user) 

1004 

1005 def test_str(self): 

1006 self.assertEqual(str(self.issue), "Issue: <empty>") 

1007 

1008 tracker = BugTracker(name="Freedesktop.org", short_name="fdo", 

1009 separator="#", public=True, 

1010 url="https://bugs.freedesktop.org/", 

1011 bug_base_url="https://bugs.freedesktop.org/show_bug.cgi?id=") 

1012 tracker.save() 

1013 bug1 = Bug.objects.create(tracker=tracker, bug_id="1234", title="random title") 

1014 bug2 = Bug.objects.create(tracker=tracker, bug_id="1235", title="random title") 

1015 

1016 self.issue.bugs.add(bug1) 

1017 self.assertEqual(str(self.issue), "Issue: fdo#1234 - random title") 

1018 

1019 self.issue.bugs.add(bug2) 

1020 self.assertEqual(str(self.issue), "Issue: [fdo#1234, fdo#1235]") 

1021 

1022 

1023class IssueFilterAssociatedTests(TestCase): 

1024 def setUp(self): 

1025 self.filter = IssueFilter.objects.create(description="Filter") 

1026 self.issue = Issue.objects.create(filer="m@x.org") 

1027 

1028 def test_delete(self): 

1029 # Create an association and check that the field deleted_on is not set 

1030 assoc = IssueFilterAssociated(filter=self.filter, issue=self.issue) 

1031 self.assertEqual(assoc.deleted_on, None) 

1032 self.assertEqual(assoc.id, None) 

1033 

1034 # Delete the association, and verify that it actually got saved to the DB 

1035 # by checking if the id has been set 

1036 user = get_user_model().objects.create(username='blabla') 

1037 assoc.delete(user, datetime.datetime.fromtimestamp(0, tz=pytz.utc)) 

1038 self.assertEqual(assoc.deleted_by, user) 

1039 self.assertNotEqual(assoc.id, None) 

1040 self.assertEqual(assoc.deleted_on, datetime.datetime.fromtimestamp(0, tz=pytz.utc)) 

1041 

1042 # Try deleting again with a new timestamp, and check that it did not change 

1043 assoc.delete(datetime.datetime.fromtimestamp(1, tz=pytz.utc)) 

1044 self.assertEqual(assoc.deleted_on, datetime.datetime.fromtimestamp(0, tz=pytz.utc)) 

1045 

1046 # Test what happens when we omit the delete's now argument 

1047 assoc = IssueFilterAssociated(filter=self.filter, issue=self.issue) 

1048 assoc.delete(user) 

1049 self.assertLess(abs(timezone.now() - assoc.deleted_on), datetime.timedelta(seconds=1)) 

1050 self.assertEqual(assoc.deleted_by, user) 

1051 

1052 # TODO: Test all the statistics! 

1053 

1054 

1055class TextStatusTests(TestCase, VettableObjectMixin): 

1056 def setUp(self): 

1057 self.testsuite = TestSuite.objects.create(name="testsuite", public=True) 

1058 self.status = TextStatus.objects.create(testsuite=self.testsuite, name="status") 

1059 self.setUpVettableObject(self.status) 

1060 

1061 self.pass_status = TextStatus.objects.create(testsuite=self.testsuite, name="pass") 

1062 self.testsuite.acceptable_statuses.add(self.pass_status) 

1063 

1064 self.notrun = TextStatus.objects.create(testsuite=self.testsuite, name="notrun") 

1065 self.testsuite.notrun_status = self.notrun 

1066 

1067 def test_color__with_specified_color(self): 

1068 self.assertEqual(TextStatus(color_hex="#123456").color, "#123456") 

1069 

1070 def test_color__default(self): 

1071 self.assertEqual(TextStatus().color, "#e9be2c") 

1072 

1073 def test_is_failure(self): 

1074 self.assertTrue(self.status.is_failure) 

1075 self.assertFalse(self.pass_status.is_failure) 

1076 

1077 def test_is_notrun(self): 

1078 self.assertTrue(self.notrun.is_notrun) 

1079 self.assertFalse(self.status.is_notrun) 

1080 

1081 def test_actual_severity(self): 

1082 self.assertEqual(TextStatus(severity=42).actual_severity, 42) 

1083 

1084 self.assertEqual(self.notrun.actual_severity, 0) 

1085 self.assertEqual(self.pass_status.actual_severity, 1) 

1086 self.assertEqual(self.status.actual_severity, 2) 

1087 

1088 def test_str(self): 

1089 self.assertEqual(str(self.status), "testsuite: status") 

1090 

1091 

1092class IssueFilterTests(TestCase): 

1093 def setUp(self): 

1094 self.tag = [] 

1095 self.runconfigs = [] 

1096 self.machine = [] 

1097 self.test = [] 

1098 self.status = [] 

1099 self.ts_run = [] 

1100 self.testresult = [] 

1101 

1102 self.testsuite = TestSuite(name="testsuite1", public=True) 

1103 self.testsuite.save() 

1104 

1105 # Create 4 instances of random objects 

1106 for i in range(4): 

1107 tag = RunConfigTag(public=True, name="tag{}".format(i)) 

1108 tag.save() 

1109 self.tag.append(tag) 

1110 

1111 machine = Machine(name="machine{}".format(i), public=True) 

1112 machine.save() 

1113 self.machine.append(machine) 

1114 

1115 test = Test(name="test{}".format(i), testsuite=self.testsuite, 

1116 public=True) 

1117 test.save() 

1118 self.test.append(test) 

1119 

1120 status = TextStatus(name="status{}".format(i), testsuite=self.testsuite) 

1121 status.save() 

1122 self.status.append(status) 

1123 

1124 # Tell which results are acceptable for the testsuite 

1125 self.testsuite.acceptable_statuses.add(self.status[1]) 

1126 self.testsuite.acceptable_statuses.add(self.status[2]) 

1127 

1128 # Create the runconfig and ts_runs 

1129 for i in range(3): 

1130 runconfig = RunConfig(name="runconfig{}".format(i), temporary=False) 

1131 runconfig.save() 

1132 

1133 # Add $i tags 

1134 if i > 0: 

1135 runconfig.tags.add(self.tag[i - 1]) 

1136 

1137 # Create a testsuite run for all machines 

1138 for machine in self.machine: 

1139 self.ts_run.append( 

1140 TestsuiteRun( 

1141 testsuite=self.testsuite, 

1142 runconfig=runconfig, 

1143 machine=machine, 

1144 run_id=0, 

1145 start="2023-12-29 12:00", 

1146 duration=datetime.timedelta(days=1), 

1147 ) 

1148 ) 

1149 self.runconfigs.append(runconfig) 

1150 

1151 # Create the test results 

1152 for test in self.test: 

1153 for ts_run in self.ts_run: 

1154 ts_run.save() 

1155 for status in self.status: 

1156 test_result = TestResult( 

1157 test=test, 

1158 ts_run=ts_run, 

1159 status=status, 

1160 stdout="h\n{} stdout1234\nYoo!".format(status.name), 

1161 stderr="h\n{} stderr1234\nasdf".format(status.name), 

1162 dmesg="h\n{} dmesg1234\nqwer".format(status.name), 

1163 start="2023-12-29 12:00", 

1164 duration=datetime.timedelta(days=1), 

1165 ) 

1166 test_result.save() 

1167 self.testresult.append(test_result) 

1168 

1169 self.filter = IssueFilter() 

1170 self.filter.save() 

1171 

1172 def test_empty(self): 

1173 for testresult in self.testresult: 

1174 self.assertTrue(self.filter.covers(testresult)) 

1175 self.assertTrue(self.filter.matches(testresult)) 

1176 

1177 # Test the conversion to user filters 

1178 expected = "" 

1179 self.assertEqual(self.filter.equivalent_user_query, expected) 

1180 self.assertEqual(self.filter._to_user_query(covers=True, matches=False), expected) 

1181 self.assertEqual(self.filter._to_user_query(covers=False, matches=True), expected) 

1182 

1183 def test_runconfig_tag_only(self): 

1184 self.filter.tags.add(self.tag[0]) 

1185 self.filter.tags.add(self.tag[2]) 

1186 

1187 filter_tags = set([self.tag[0].id, self.tag[2].id]) 

1188 for testresult in self.testresult: 

1189 tags = set([t.id for t in testresult.ts_run.runconfig.tags.all()]) 

1190 should_match = not filter_tags.isdisjoint(tags) 

1191 self.assertTrue(self.filter.covers(testresult) == should_match) 

1192 self.assertTrue(self.filter.matches(testresult) == should_match) 

1193 

1194 # Test the conversion to user filters 

1195 expected = 'runconfig_tag IS IN ["tag0", "tag2"]' 

1196 self.assertEqual(self.filter.equivalent_user_query, expected) 

1197 self.assertEqual(self.filter._to_user_query(covers=True, matches=False), expected) 

1198 self.assertEqual(self.filter._to_user_query(covers=False, matches=True), "") 

1199 

1200 def test_machine_and_machine_tags(self): 

1201 tag1 = MachineTag.objects.create(name="Tag1", public=True) 

1202 self.machine[0].tags.add(tag1) 

1203 self.machine[2].tags.add(tag1) 

1204 

1205 self.filter.machine_tags.add(tag1) 

1206 

1207 for testresult in self.testresult: 

1208 machine = testresult.ts_run.machine 

1209 should_match = (machine == self.machine[0] or machine == self.machine[2]) 

1210 self.assertTrue(self.filter.covers(testresult) == should_match) 

1211 self.assertTrue(self.filter.matches(testresult) == should_match) 

1212 

1213 # Test the conversion to user filters 

1214 expected = 'machine_tag IS IN ["Tag1"]' 

1215 self.assertEqual(self.filter.equivalent_user_query, expected) 

1216 self.assertEqual(self.filter._to_user_query(covers=True, matches=False), expected) 

1217 self.assertEqual(self.filter._to_user_query(covers=False, matches=True), "") 

1218 

1219 def test_machine_tag_only(self): 

1220 # Test that if a user asks for a tag that contains no machines, we do not match anything 

1221 tag1 = MachineTag.objects.create(name="Tag1", public=True) 

1222 self.filter.machine_tags.add(tag1) 

1223 

1224 for testresult in self.testresult: 

1225 self.assertFalse(self.filter.covers(testresult)) 

1226 self.assertFalse(self.filter.matches(testresult)) 

1227 

1228 # Test the conversion to user filters 

1229 expected = 'machine_tag IS IN ["Tag1"]' 

1230 self.assertEqual(self.filter.equivalent_user_query, expected) 

1231 self.assertEqual(self.filter._to_user_query(covers=True, matches=False), expected) 

1232 self.assertEqual(self.filter._to_user_query(covers=False, matches=True), "") 

1233 

1234 def test_machine_only(self): 

1235 self.filter.machines.add(self.machine[0]) 

1236 self.filter.machines.add(self.machine[2]) 

1237 

1238 for testresult in self.testresult: 

1239 machine = testresult.ts_run.machine 

1240 should_match = (machine == self.machine[0] or machine == self.machine[2]) 

1241 self.assertTrue(self.filter.covers(testresult) == should_match) 

1242 self.assertTrue(self.filter.matches(testresult) == should_match) 

1243 

1244 # Test the conversion to user filters 

1245 machine_names = [mach.name for mach in self.filter.machines_cached] 

1246 expected = 'machine_name IS IN ["{}", "{}"]'.format(machine_names[0], machine_names[1]) 

1247 self.assertEqual(self.filter.equivalent_user_query, expected) 

1248 self.assertEqual(self.filter._to_user_query(covers=True, matches=False), expected) 

1249 self.assertEqual(self.filter._to_user_query(covers=False, matches=True), "") 

1250 

1251 def test_test_only(self): 

1252 # Create another status from another testsuite 

1253 testsuite2 = TestSuite.objects.create(name="testsuite2", public=True) 

1254 test_ts2 = Test.objects.create(name="test-ts2", testsuite=testsuite2, public=True) 

1255 

1256 self.filter.tests.add(self.test[0], self.test[2], test_ts2) 

1257 

1258 for testresult in self.testresult: 

1259 test = testresult.test 

1260 should_match = (test == self.test[0] or test == self.test[2]) 

1261 self.assertTrue(self.filter.covers(testresult) == should_match) 

1262 self.assertTrue(self.filter.matches(testresult) == should_match) 

1263 

1264 # Generate all the valid queries that could be generated 

1265 query_opts = [] 

1266 q1 = '(testsuite_name = "testsuite1" AND test_name IS IN ["{}", "{}"])' 

1267 q2 = '(testsuite_name = "testsuite2" AND test_name IS IN ["test-ts2"])' 

1268 query_pattern = '({} OR {})' 

1269 query_opts.append(query_pattern.format(q1.format("test0", "test2"), q2)) 

1270 query_opts.append(query_pattern.format(q1.format("test2", "test0"), q2)) 

1271 query_opts.append(query_pattern.format(q2, q1.format("test2", "test0"))) 

1272 query_opts.append(query_pattern.format(q2, q1.format("test0", "test2"))) 

1273 

1274 # Test the conversion to user filters 

1275 self.assertIn(self.filter.equivalent_user_query, query_opts) 

1276 self.assertEqual(self.filter._to_user_query(covers=False, matches=True), "") 

1277 self.assertIn(self.filter._to_user_query(covers=True, matches=False), query_opts) 

1278 

1279 def test_results_only(self): 

1280 # Create another status from another testsuite 

1281 testsuite2 = TestSuite.objects.create(name="testsuite2", public=True) 

1282 status_ts2 = TextStatus.objects.create(name="status-ts2", testsuite=testsuite2) 

1283 

1284 self.filter.statuses.add(self.status[0], self.status[2], status_ts2) 

1285 

1286 for testresult in self.testresult: 

1287 status = testresult.status 

1288 should_match = (status == self.status[0] or status == self.status[2]) 

1289 self.assertTrue(self.filter.covers(testresult)) 

1290 self.assertTrue(self.filter.matches(testresult) == should_match, testresult) 

1291 

1292 # Generate all the valid queries that could be generated 

1293 query_opts = [] 

1294 q1 = '(testsuite_name = "testsuite1" AND status_name IS IN ["{}", "{}"])' 

1295 q2 = '(testsuite_name = "testsuite2" AND status_name IS IN ["status-ts2"])' 

1296 query_pattern = '({} OR {})' 

1297 query_opts.append(query_pattern.format(q1.format("status0", "status2"), q2)) 

1298 query_opts.append(query_pattern.format(q1.format("status2", "status0"), q2)) 

1299 query_opts.append(query_pattern.format(q2, q1.format("status2", "status0"))) 

1300 query_opts.append(query_pattern.format(q2, q1.format("status0", "status2"))) 

1301 

1302 # Test the conversion to user filters 

1303 self.assertIn(self.filter.equivalent_user_query, query_opts) 

1304 self.assertIn(self.filter._to_user_query(covers=False, matches=True), query_opts) 

1305 self.assertEqual(self.filter._to_user_query(covers=True, matches=False), "") 

1306 

1307 def test_escaping_of_single_quote(self): 

1308 self.filter.stdout_regex = r"test's log" 

1309 

1310 # Test the conversion to user filters and ensure that the single quote is escaped 

1311 expected = r"stdout ~= 'test\'s log'" 

1312 self.assertEqual(self.filter._to_user_query(covers=False, matches=True), expected) 

1313 

1314 def test_stdout_only(self): 

1315 self.filter.stdout_regex = r"result[12] stdout\d+" 

1316 for testresult in self.testresult: 

1317 stdout_line = testresult.stdout.split('\n')[1] 

1318 should_match = (stdout_line == "result1 stdout1234" or stdout_line == "result2 stdout1234") 

1319 self.assertTrue(self.filter.covers(testresult)) 

1320 self.assertTrue(self.filter.matches(testresult) == should_match, stdout_line) 

1321 

1322 # Test the conversion to user filters 

1323 expected = r"stdout ~= 'result[12] stdout\d+'" 

1324 self.assertEqual(self.filter.equivalent_user_query, expected) 

1325 self.assertEqual(self.filter._to_user_query(covers=True, matches=False), "") 

1326 self.assertEqual(self.filter._to_user_query(covers=False, matches=True), expected) 

1327 

1328 def test_stderr_only(self): 

1329 self.filter.stderr_regex = r"result[12] stderr\d+" 

1330 for testresult in self.testresult: 

1331 stderr_line = testresult.stderr.split('\n')[1] 

1332 should_match = (stderr_line == "result1 stderr1234" or stderr_line == "result2 stderr1234") 

1333 self.assertTrue(self.filter.covers(testresult)) 

1334 self.assertTrue(self.filter.matches(testresult) == should_match, stderr_line) 

1335 

1336 # Test the conversion to user filters 

1337 expected = r"stderr ~= 'result[12] stderr\d+'" 

1338 self.assertEqual(self.filter.equivalent_user_query, expected) 

1339 self.assertEqual(self.filter._to_user_query(covers=True, matches=False), "") 

1340 self.assertEqual(self.filter._to_user_query(covers=False, matches=True), expected) 

1341 

1342 def test_dmesg_only(self): 

1343 self.filter.dmesg_regex = r"result[12] dmesg\d+" 

1344 for testresult in self.testresult: 

1345 dmesg_line = testresult.dmesg.split('\n')[1] 

1346 should_match = (dmesg_line == "result1 dmesg1234" or dmesg_line == "result2 dmesg1234") 

1347 self.assertTrue(self.filter.covers(testresult)) 

1348 self.assertTrue(self.filter.matches(testresult) == should_match, dmesg_line) 

1349 

1350 # Test the conversion to user filters 

1351 expected = r"dmesg ~= 'result[12] dmesg\d+'" 

1352 self.assertEqual(self.filter.equivalent_user_query, expected) 

1353 self.assertEqual(self.filter._to_user_query(covers=True, matches=False), "") 

1354 self.assertEqual(self.filter._to_user_query(covers=False, matches=True), expected) 

1355 

1356 def test_user_query_filter(self): 

1357 self.filter.user_query = f'machine_name IS IN ["{self.machine[0]}", "{self.machine[2]}"]' 

1358 

1359 for testresult in self.testresult: 

1360 machine = testresult.ts_run.machine 

1361 should_match = (machine == self.machine[0] or machine == self.machine[2]) 

1362 self.assertTrue(self.filter.covers(testresult) == should_match) 

1363 self.assertTrue(self.filter.matches(testresult) == should_match) 

1364 

1365 def test_replace(self): 

1366 user = get_user_model().objects.create(username='blabla') 

1367 

1368 issues = [] 

1369 for i in range(3): 

1370 issue = Issue.objects.create(description="") 

1371 issue.set_filters([self.filter], user) 

1372 issues.append(issue) 

1373 if i == 1: 

1374 issue.archive(user) 

1375 

1376 new_filter = IssueFilter.objects.create(description="new filter") 

1377 

1378 self.assertEqual(IssueFilterAssociated.objects.filter(filter=self.filter).count(), 3) 

1379 self.assertEqual(IssueFilterAssociated.objects.filter(filter=self.filter, deleted_by=user).count(), 1) 

1380 self.assertEqual(IssueFilterAssociated.objects.filter(deleted_on=None, filter=self.filter).count(), 2) 

1381 self.assertEqual(IssueFilterAssociated.objects.filter(filter=new_filter).count(), 0) 

1382 self.assertFalse(self.filter.hidden) 

1383 

1384 self.filter.replace(new_filter, user) 

1385 

1386 self.assertEqual(IssueFilterAssociated.objects.filter(filter=self.filter).count(), 3) 

1387 self.assertEqual(IssueFilterAssociated.objects.filter(deleted_on=None, filter=self.filter).count(), 0) 

1388 self.assertEqual(IssueFilterAssociated.objects.filter(filter=new_filter).count(), 2) 

1389 self.assertEqual(IssueFilterAssociated.objects.filter(filter=self.filter, deleted_by=user).count(), 3) 

1390 self.assertTrue(self.filter.hidden) 

1391 

1392 # Verify that the all the machines specified by tags are listed when calling test_machines_cached 

1393 def test_machines_cached(self): 

1394 # Create a machine tag and tag machine1 with it 

1395 tag1 = MachineTag.objects.create(name="TAG1", public=True) 

1396 self.machine[1].tags.add(tag1) 

1397 self.machine[3].tags.add(tag1) 

1398 

1399 # Now add the tag1 to the list of machines tags, and machine 0 as a machine 

1400 self.filter.machine_tags.add(tag1) 

1401 self.filter.machines.add(self.machine[0]) 

1402 

1403 # Now check that the filter lists the machines 1 and 2 when calling test_machines_cached 

1404 self.assertEqual(self.filter.machines_cached, set([self.machine[0], self.machine[1], self.machine[3]])) 

1405 

1406 

1407class RateTests(TestCase): 

1408 def test_rate(self): 

1409 self.assertAlmostEqual(Rate('', 1, 2).rate, 0.5) 

1410 self.assertEqual(Rate('', 0, 0).rate, 0) 

1411 

1412 def test_str(self): 

1413 self.assertEqual(str(Rate('tests', 1, 2)), '1 / 2 tests (50.0%)') 

1414 

1415 

1416class KnownFailureTests(TestCase): 

1417 def test_covered_runconfigs_since(self): 

1418 # Create a list of runconfigs that will be used by the "runconfig_covered" 

1419 runconfigs = list() 

1420 for i in range(6): 

1421 runconfigs.append(RunConfig.objects.create(name="Runconfig{}".format(i), temporary=True)) 

1422 

1423 # Create a failure linked to an Issue with all the runconfigs as covered, and the ifa's 

1424 # covered runconfigs are all but the last one. Associate the runconfig index 1 as the runconfig 

1425 # where the failure happened 

1426 failure = KnownFailure() 

1427 failure.result = TestResult(ts_run=TestsuiteRun(runconfig=runconfigs[1])) 

1428 failure.matched_ifa = IssueFilterAssociated(issue=Issue()) 

1429 failure.matched_ifa.runconfigs_covered = runconfigs[:-1] 

1430 failure.matched_ifa.issue.runconfigs_covered = runconfigs 

1431 

1432 # Check that when the runconfig is not found, we return None 

1433 self.assertEqual(KnownFailure._runconfig_index(runconfigs, RunConfig()), None) 

1434 

1435 # Check that the number of runconfigs since the failure happened is 

1436 self.assertEqual(failure.covered_runconfigs_since_for_issue, 4) # len(issue.runconfig_covered) - 2 

1437 self.assertEqual(failure.covered_runconfigs_since_for_filter, 3) # len(ifa.runconfig_covered) - 2 

1438 

1439 

1440class UnknownFailureTests(TestCase): 

1441 @patch('CIResults.models.UnknownFailure.matched_archived_ifas') 

1442 def test_matched_archived_ifas_cached(self, matched_archived_ifas_mocked): 

1443 self.assertEqual(UnknownFailure().matched_archived_ifas_cached, matched_archived_ifas_mocked.all.return_value) 

1444 

1445 def test_matched_issues(self): 

1446 failure = UnknownFailure() 

1447 failure.matched_archived_ifas_cached = [MagicMock(spec=IssueFilterAssociated), 

1448 MagicMock(spec=IssueFilterAssociated)] 

1449 self.assertEqual(failure.matched_issues, 

1450 set([e.issue for e in failure.matched_archived_ifas_cached])) 

1451 

1452 @patch('CIResults.models.UnknownFailure.result') 

1453 def test_str(self, results_mocked): 

1454 failure = UnknownFailure() 

1455 self.assertEqual(str(failure), str(failure.result)) 

1456 

1457 

1458class RunFilterStatisticTests(TestCase): 

1459 def test_str(self): 

1460 f = IssueFilter(description="My filter") 

1461 runconfig = RunConfig(name='RunConfig') 

1462 

1463 self.assertEqual(str(RunFilterStatistic(runconfig=runconfig, filter=f, covered_count=0, matched_count=0)), 

1464 'My filter on RunConfig: match rate 0/0 (0.00%)') 

1465 self.assertEqual(str(RunFilterStatistic(runconfig=runconfig, filter=f, covered_count=10, matched_count=5)), 

1466 'My filter on RunConfig: match rate 5/10 (50.00%)')