Coverage for CIResults/tests/test_filtering.py: 100%

238 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 13:11 +0000

1from unittest.mock import patch 

2from django.test import TestCase 

3from django.test.client import RequestFactory 

4from django.db.models import Q 

5 

6from CIResults.models import Issue, Bug, TestsuiteRun, TestResult, KnownFailure 

7from CIResults.filtering import UserFiltrableMixin, FilterObjectStr, FilterObjectBool, FilterObject, LegacyParser 

8from CIResults.filtering import QueryParser, FilterObjectInteger, FilterObjectDateTime, FilterObjectDuration 

9from CIResults.filtering import FilterObjectModel, FilterObjectJSON 

10from CIResults.filtering import QueryCreator, QueryVisitor 

11from CIResults.models import Machine 

12from shortener.models import Shortener 

13 

14import datetime 

15import pytz 

16 

17 

18class UserFiltrableTestsMixin: 

19 def test_filter_objects_to_db(self): 

20 # Abort if the class does not have a model 

21 if not hasattr(self, 'Model'): 

22 raise ValueError("The class '{}' does not have a 'Model' attribute".format(self)) # pragma: no cover 

23 

24 # Abort if the object does not have the filter_objects_to_db attribute 

25 if not hasattr(self.Model, 'filter_objects_to_db'): 

26 raise ValueError("The model '{}' does not have a 'filter_objects_to_db " 

27 "attribute'".format(self.Model)) # pragma: no cover 

28 

29 # execute the query with USE_TZ=False to ignore the naive datetime warning 

30 with self.settings(USE_TZ=False): 

31 for field_name, db_obj in self.Model.filter_objects_to_db.items(): 

32 if isinstance(db_obj, FilterObjectModel): 

33 filter_name = '{}__in'.format(db_obj.db_path) 

34 value = db_obj.model.objects.none() 

35 elif isinstance(db_obj, FilterObjectJSON): 

36 db_obj.key = 'key' 

37 filter_name = '{}__exact'.format(db_obj.db_path) 

38 value = db_obj.test_value 

39 else: 

40 filter_name = '{}__exact'.format(db_obj.db_path) 

41 value = db_obj.test_value 

42 

43 try: 

44 self.Model.objects.filter(**{filter_name: value}) 

45 except Exception as e: # pragma: no cover 

46 self.fail("Class {}'s field '{}' is not working: {}.".format(self.Model, 

47 field_name, 

48 str(e))) # pragma: no cover 

49 

50 

51class BugTests(TestCase, UserFiltrableTestsMixin): 

52 Model = Bug 

53 

54 

55class IssueTests(TestCase, UserFiltrableTestsMixin): 

56 Model = Issue 

57 

58 

59class TestsuiteRunTests(TestCase, UserFiltrableTestsMixin): 

60 Model = TestsuiteRun 

61 

62 

63class TestResultTests(TestCase, UserFiltrableTestsMixin): 

64 Model = TestResult 

65 

66 

67class KnownFailureTests(TestCase, UserFiltrableTestsMixin): 

68 Model = KnownFailure 

69 

70 

71class QueryVisitorTests(TestCase): 

72 def test_get_related_model(self): 

73 queryVisitor = QueryVisitor(KnownFailure) 

74 self.assertEqual(queryVisitor.get_related_model("result"), TestResult) 

75 

76 def test_get_related_model_no_attribute(self): 

77 queryVisitor = QueryVisitor(KnownFailure) 

78 with self.assertRaisesMessage(AttributeError, "'KnownFailure' has no attribute 'status'"): 

79 queryVisitor.get_related_model("status") 

80 

81 

82class QueryParserTests(TestCase): 

83 class NestedModel: 

84 class Object: 

85 q_object = None 

86 

87 def filter(self, q_object): 

88 QueryParserTests.NestedModel.Object.q_object = q_object 

89 return Bug.objects.none() 

90 

91 objects = Object() 

92 

93 filter_objects_to_db = { 

94 "nested1": FilterObjectInteger('nested_db__1'), 

95 "nested2": FilterObjectStr('nested_db__2'), 

96 } 

97 

98 def setUp(self): 

99 self.filter_objects_to_db = { 

100 "user_abc": FilterObjectInteger('db__abc'), 

101 "user_def": FilterObjectStr('db__def'), 

102 "user_ghi": FilterObjectDateTime('db__ghi'), 

103 "user_jkl": FilterObjectDuration('db__jkl'), 

104 "user_mno": FilterObjectBool('db__mno'), 

105 "user_pqr": FilterObjectDateTime('db__pqr'), 

106 "user_nes": FilterObjectModel(self.NestedModel, "db__nes"), 

107 "user_json": FilterObjectJSON('db__json'), 

108 } 

109 

110 def test_empty_query(self): 

111 parser = QueryParser(self, "") 

112 self.assertTrue(parser.is_valid, parser.error) 

113 self.assertTrue(parser.is_empty) 

114 self.assertEqual(parser.q_objects, Q()) 

115 self.assertEqual(parser.error, None) 

116 

117 def test_unknown_object_name(self): 

118 parser = QueryParser(self, "hello = 'world'") 

119 

120 self.assertFalse(parser.is_valid, parser.error) 

121 self.assertTrue(parser.is_empty) 

122 self.assertEqual(parser.q_objects, Q()) 

123 self.assertEqual(parser.error, "The object 'hello' does not exist") 

124 

125 def test_key_with_double_underscore(self): 

126 parser = QueryParser(self, "user_json.toto__tata = 'world'") 

127 

128 self.assertFalse(parser.is_valid, parser.error) 

129 self.assertTrue(parser.is_empty) 

130 self.assertEqual(parser.q_objects, Q()) 

131 self.assertEqual(parser.error, "Dict object keys cannot contain the substring '__'") 

132 

133 def test_two_keys_on_keyed_object(self): 

134 parser = QueryParser(self, "user_json.toto.tata = 'world'") 

135 

136 self.assertFalse(parser.is_valid, parser.error) 

137 self.assertTrue(parser.is_empty) 

138 self.assertEqual(parser.q_objects, Q()) 

139 

140 def test_no_key_on_keyed_object(self): 

141 parser = QueryParser(self, "user_json = 'world'") 

142 

143 self.assertFalse(parser.is_valid, parser.error) 

144 self.assertTrue(parser.is_empty) 

145 self.assertEqual(parser.q_objects, Q()) 

146 self.assertEqual(parser.error, "The dict object 'user_json' requires a key to access its data") 

147 

148 def test_key_on_non_keyed_object(self): 

149 parser = QueryParser(self, "user_pqr.toto = 'world'") 

150 

151 self.assertFalse(parser.is_valid, parser.error) 

152 self.assertTrue(parser.is_empty) 

153 self.assertEqual(parser.q_objects, Q()) 

154 self.assertEqual(parser.error, "The object 'user_pqr' cannot have an associated key") 

155 

156 def test_invalid_syntax(self): 

157 parser = QueryParser(self, "hello = 'world") 

158 

159 self.assertFalse(parser.is_valid, parser.error) 

160 self.assertTrue(parser.is_empty) 

161 self.assertEqual(parser.q_objects, Q()) 

162 self.assertEqual(parser.error, "Expected ''' at position (1, 15) => 'o = 'world*'.") 

163 

164 @patch('django.utils.timezone.now', 

165 return_value=datetime.datetime.strptime('2019-01-01T00:00:05', "%Y-%m-%dT%H:%M:%S")) 

166 def test_parsing_all_types(self, now_mocked): 

167 parser = QueryParser(self, "user_abc=123 AND user_def = 'HELLO' AND user_ghi =datetime(2019-02-01) " 

168 "AND user_jkl = duration(00:00:03) AND user_jkl > ago(00:00:05) " 

169 "AND user_mno = TRUE AND user_pqr = NONE AND user_json.foo_bar = 'bar'" 

170 "AND user_json.foo_bar2 = 42") 

171 

172 self.assertTrue(parser.is_valid, parser.error) 

173 self.assertFalse(parser.is_empty) 

174 # HACK: Using 'children' attribute and set() here because the ordering was different, for some reason, between 

175 # Q objects, which caused direct comparison to fail. 

176 self.assertEqual(set(parser.q_objects.children), 

177 set(Q(db__abc__exact=123, db__def__exact='HELLO', db__mno__exact=True, 

178 db__ghi__exact=FilterObjectDateTime.parse_value('2019-02-01'), 

179 db__jkl__exact=datetime.timedelta(seconds=3), 

180 db__jkl__gt=datetime.datetime.strptime('2019-01-01', "%Y-%m-%d"), 

181 db__pqr__exact=None, db__json__foo_bar__exact='bar', 

182 db__json__foo_bar2__exact=42).children)) 

183 

184 def test_integer_lookups(self): 

185 for lookup, suffix in [('<=', 'lte'), ('>=', 'gte'), ('<', 'lt'), ('>', 'gt'), ('<', 'lt'), ('=', 'exact')]: 

186 parser = QueryParser(self, "user_abc {} 1234".format(lookup)) 

187 key = "db__abc__{}".format(suffix) 

188 self.assertEqual(parser.q_objects, Q(**{key: 1234})) 

189 

190 parser = QueryParser(self, "user_abc IS IN [12, 34]") 

191 

192 self.assertTrue(parser.is_valid, parser.error) 

193 self.assertFalse(parser.is_empty) 

194 self.assertEqual(parser.q_objects, Q(db__abc__in=[12, 34])) 

195 

196 def test_string_lookups(self): 

197 for lookup, suffix, negated in [('CONTAINS', 'contains', False), ('ICONTAINS', 'icontains', False), 

198 ('MATCHES', 'regex', False), ('~=', 'regex', False), ('=', 'exact', False), 

199 ('!=', 'exact', True)]: 

200 parser = QueryParser(self, "user_def {} 'hello'".format(lookup)) 

201 

202 key = "db__def__{}".format(suffix) 

203 expected = Q(**{key: "hello"}) 

204 

205 if negated: 

206 self.assertEqual(parser.q_objects, ~expected) 

207 else: 

208 self.assertEqual(parser.q_objects, expected) 

209 

210 parser = QueryParser(self, "user_def IS IN ['hello','world']") 

211 self.assertEqual(parser.q_objects, Q(db__def__in=['hello', 'world'])) 

212 

213 def test_empty_string_query(self): 

214 parser = QueryParser(self, "user_def = ''") 

215 key = "db__def__exact" 

216 expected = Q(**{key: ""}) 

217 self.assertEqual(parser.q_objects, expected) 

218 

219 def test_escaped_string_query(self): 

220 for quote in ["'", '"']: 

221 query = f"user_def = {quote}foo\\{quote}bar{quote}" 

222 parser = QueryParser(self, query) 

223 self.assertTrue(parser.is_valid) 

224 self.assertEqual(parser.error, None) 

225 expected = Q(**{"db__def__exact": f"foo\\{quote}bar"}) 

226 self.assertEqual(parser.q_objects, expected) 

227 

228 def test_limit_alone(self): 

229 parser = QueryParser(self, "user_abc=123 AND user_def = 'HELLO' LIMIT 42") 

230 self.assertEqual(parser.limit, 42) 

231 

232 def test_limit_negative(self): 

233 parser = QueryParser(self, "user_abc=123 AND user_def = 'HELLO' LIMIT -42") 

234 

235 self.assertFalse(parser.is_valid, ) 

236 self.assertEqual(parser.error, "Negative limits are not supported") 

237 

238 def test_orderby_alone(self): 

239 parser = QueryParser(self, "user_abc=123 AND user_def = 'HELLO' ORDER_BY -user_abc") 

240 self.assertEqual(parser.orderby, "-db__abc") 

241 

242 def test_orderby_invalid_object(self): 

243 parser = QueryParser(self, "user_abc=123 AND user_def = 'HELLO' ORDER_BY toto") 

244 

245 self.assertFalse(parser.is_valid, ) 

246 self.assertEqual(parser.error, "The object 'toto' does not exist") 

247 

248 def test_orderby_limit_interaction(self): 

249 parser = QueryParser(self, "user_abc=123 AND user_def = 'HELLO' ORDER_BY user_def LIMIT 42") 

250 

251 self.assertTrue(parser.is_valid, parser.error) 

252 self.assertFalse(parser.is_empty) 

253 self.assertEqual(parser.q_objects, 

254 Q(db__abc__exact=123, db__def__exact='HELLO')) 

255 self.assertEqual(parser.limit, 42) 

256 self.assertEqual(parser.orderby, "db__def") 

257 

258 def test_invalid_subquery(self): 

259 parser = QueryParser(self, "user_abc=123 AND user_nes MATCHES (user_abc=123) AND user_def = 'TOTO'") 

260 

261 self.assertFalse(parser.is_valid) 

262 self.assertEqual(parser.error, "The object 'user_abc' does not exist") 

263 self.assertTrue(parser.is_empty) 

264 

265 def test_subquery(self): 

266 parser = QueryParser(self, "user_abc=123 AND " 

267 "user_nes MATCHES (nested1=123 AND ((nested2 = 'hello') OR (nested2 = 'world'))) " 

268 "AND user_def = 'TOTO'") 

269 

270 self.assertTrue(parser.is_valid, parser.error) 

271 self.assertFalse(parser.is_empty) 

272 

273 self.assertEqual(self.NestedModel.Object.q_object, 

274 Q(nested_db__1__exact=123) & (Q(nested_db__2__exact="hello") | Q(nested_db__2__exact="world"))) 

275 

276 def test_complex_query1(self): 

277 parser = QueryParser(self, '''(user_abc IS IN ["toto","titi"] AND user_def=datetime(2018-06-23)) OR 

278 ((user_ghi > 456 AND NOT user_def ~= "hello" ) OR user_ghi < 456)''') 

279 q_filter = (Q(**{'db__abc__in': ['toto', 'titi']}) & Q(**{'db__def__exact': 

280 datetime.datetime(2018, 6, 23, 0, 0, tzinfo=pytz.utc)})) | ((Q(**{'db__ghi__gt': 456}) 

281 & ~Q(**{'db__def__regex': 'hello'})) | 

282 Q(**{'db__ghi__lt': 456})) 

283 self.assertTrue(parser.is_valid, parser.error) 

284 self.assertFalse(parser.is_empty) 

285 self.assertEqual(parser.q_objects, q_filter) 

286 

287 def test_complex_query2(self): 

288 parser = QueryParser(self, '''(user_abc IS IN [2,3,4] OR user_abc NOT IN [2,3] ) 

289 AND (user_abc <= 1 OR user_abc >= 0)''') 

290 q_filter = (Q(**{'db__abc__in': [2, 3, 4]}) | ~Q(**{'db__abc__in': [2, 3]}))\ 

291 & (Q(**{'db__abc__lte': 1}) | Q(**{'db__abc__gte': 0})) 

292 

293 self.assertTrue(parser.is_valid, parser.error) 

294 self.assertFalse(parser.is_empty) 

295 self.assertEqual(parser.q_objects, q_filter) 

296 

297 def test_ignore_fields__all_fields_ignored(self): 

298 parser = QueryParser(TestResult, "status_name = 'fail'", ignore_fields=["status_name"]) 

299 self.assertEqual( 

300 ('SELECT DISTINCT "CIResults_testresult"."id", "CIResults_testresult"."test_id", ' 

301 '"CIResults_testresult"."ts_run_id", "CIResults_testresult"."status_id", "CIResults_testresult"."url", ' 

302 '"CIResults_testresult"."start", "CIResults_testresult"."duration", "CIResults_testresult"."command", ' 

303 '"CIResults_testresult"."stdout", "CIResults_testresult"."stderr", "CIResults_testresult"."dmesg" FROM ' 

304 '"CIResults_testresult"'), 

305 str(parser.objects.query) 

306 ) 

307 

308 def test_ignore_fields__complex_query_with_multiple_ignored_fields(self): 

309 parser = QueryParser(TestResult, "status_name = 'fail' AND (stdout = 'out' OR stderr = 'err')", 

310 ignore_fields=["status_name", "stderr"]) 

311 self.assertEqual( 

312 ('SELECT DISTINCT "CIResults_testresult"."id", "CIResults_testresult"."test_id", ' 

313 '"CIResults_testresult"."ts_run_id", "CIResults_testresult"."status_id", "CIResults_testresult"."url", ' 

314 '"CIResults_testresult"."start", "CIResults_testresult"."duration", "CIResults_testresult"."command", ' 

315 '"CIResults_testresult"."stdout", "CIResults_testresult"."stderr", "CIResults_testresult"."dmesg" FROM ' 

316 '"CIResults_testresult" WHERE "CIResults_testresult"."stdout" = out'), 

317 str(parser.objects.query) 

318 ) 

319 

320 

321class LegacyParserTests(TestCase): 

322 def setUp(self): 

323 UserFiltrableMixin.filter_objects_to_db = { 

324 "user_abc": FilterObjectStr('db__abc'), 

325 "user_def": FilterObjectStr('db__def'), 

326 "user_ghi": FilterObjectStr('db__ghi'), 

327 "user_jkl": FilterObjectBool('db__jkl'), 

328 "user_mno": FilterObjectDuration('db__mno'), 

329 } 

330 

331 def test_no_filters(self): 

332 parser = LegacyParser(UserFiltrableMixin) 

333 self.assertEqual(parser.query, "") 

334 

335 def test_valid_filters(self): 

336 parser = LegacyParser(UserFiltrableMixin, 

337 only__user_abc__in=['toto', 'int(1234.3)'], 

338 only__user_def__exact='datetime(2018-06-23)', 

339 only__user_ghi__gt=['int(456)'], 

340 only__user_jkl__exact='bool(1)', 

341 only__user_mno__exact='duration(00:00:03)', 

342 exclude__user_def__regex='str(hello)') 

343 self.assertEqual(parser.query, 

344 "user_abc IS IN ['toto', 1234.3] AND user_def = datetime(2018-06-23) AND user_ghi > 456 " 

345 "AND user_jkl = TRUE AND user_mno = duration(00:00:03) AND NOT (user_def ~= 'hello')") 

346 

347 def test_regex_aggregation(self): 

348 parser = LegacyParser(UserFiltrableMixin, only__user_abc__regex=['toto', 'tata', 'titi']) 

349 self.assertEqual(parser.query, "user_abc ~= '(toto|tata|titi)'") 

350 

351 def test_invalid_formats(self): 

352 parser = LegacyParser(UserFiltrableMixin, balbla='ghujfdk', oops__user_abc__in=12, 

353 only__invalid__in=13, only__user_abc__toto=14) 

354 self.assertEqual(parser.query, "") 

355 

356 

357class UserFiltrableMixinTests(TestCase): 

358 def test_old_style(self): 

359 queryset = TestResult.from_user_filters(only__status_name__exact='pass').objects 

360 self.assertIn('WHERE "CIResults_textstatus"."name" = pass', str(queryset.query)) 

361 

362 def test_new_style(self): 

363 queryset = TestResult.from_user_filters(query=['status_name = "pass"']).objects 

364 self.assertIn('WHERE "CIResults_textstatus"."name" = pass', str(queryset.query)) 

365 

366 def test_new_style_with_short_queries(self): 

367 q = 'status_name = "toto"' 

368 q2 = 'status_name = "tata"' 

369 

370 # Check that the shorthand versions resolve to the right query 

371 short_query = Shortener.get_or_create(q) 

372 query = TestResult.from_user_filters(query_key=short_query.shorthand) 

373 self.assertEqual(query.user_query, q) 

374 

375 # Check that we prioritize full queries to shorthands 

376 query = TestResult.from_user_filters(query=q2, query_key=short_query.shorthand) 

377 self.assertEqual(query.user_query, q2) 

378 

379 def test_sub_queries(self): 

380 parser = TestResult.from_user_filters(query=['machine_tag CONTAINS ["tag1", "tag2"]']) 

381 sub_query = str(parser.q_objects.children[0][1].query) 

382 self.assertEqual(parser.q_objects.children[0][0], 'ts_run__machine__in') 

383 self.assertIn('"name" = tag1', sub_query) 

384 self.assertIn('"name" = tag2', sub_query) 

385 

386 

387class FilterObjectTests(TestCase): 

388 def test_empty_description(self): 

389 self.assertEqual(FilterObject("").description, "<no description yet>") 

390 

391 def test_with_description(self): 

392 self.assertEqual(FilterObject("", "My description").description, "My description") 

393 

394 

395class FilterObjectDurationTests(TestCase): 

396 def test_invalid_value(self): 

397 with self.assertRaisesRegex(ValueError, "The value '1 month' does not represent a duration"): 

398 FilterObjectDuration.parse_value('1 month') 

399 

400 

401class BuildQueryFromRequestTests(TestCase): 

402 def setUp(self): 

403 self.factory = RequestFactory() 

404 

405 def test_build_machine_query_from_request(self): 

406 request = self.factory.get('/machine?name=name&description=description') 

407 self.assertEqual(QueryCreator(request, Machine).multiple_request_params_to_query().user_query, 

408 "name MATCHES 'name' AND description MATCHES 'description'")