Coverage for CIResults/tests/test_filtering.py: 100%
238 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 13:11 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 13:11 +0000
1from unittest.mock import patch
2from django.test import TestCase
3from django.test.client import RequestFactory
4from django.db.models import Q
6from CIResults.models import Issue, Bug, TestsuiteRun, TestResult, KnownFailure
7from CIResults.filtering import UserFiltrableMixin, FilterObjectStr, FilterObjectBool, FilterObject, LegacyParser
8from CIResults.filtering import QueryParser, FilterObjectInteger, FilterObjectDateTime, FilterObjectDuration
9from CIResults.filtering import FilterObjectModel, FilterObjectJSON
10from CIResults.filtering import QueryCreator, QueryVisitor
11from CIResults.models import Machine
12from shortener.models import Shortener
14import datetime
15import pytz
18class UserFiltrableTestsMixin:
19 def test_filter_objects_to_db(self):
20 # Abort if the class does not have a model
21 if not hasattr(self, 'Model'):
22 raise ValueError("The class '{}' does not have a 'Model' attribute".format(self)) # pragma: no cover
24 # Abort if the object does not have the filter_objects_to_db attribute
25 if not hasattr(self.Model, 'filter_objects_to_db'):
26 raise ValueError("The model '{}' does not have a 'filter_objects_to_db "
27 "attribute'".format(self.Model)) # pragma: no cover
29 # execute the query with USE_TZ=False to ignore the naive datetime warning
30 with self.settings(USE_TZ=False):
31 for field_name, db_obj in self.Model.filter_objects_to_db.items():
32 if isinstance(db_obj, FilterObjectModel):
33 filter_name = '{}__in'.format(db_obj.db_path)
34 value = db_obj.model.objects.none()
35 elif isinstance(db_obj, FilterObjectJSON):
36 db_obj.key = 'key'
37 filter_name = '{}__exact'.format(db_obj.db_path)
38 value = db_obj.test_value
39 else:
40 filter_name = '{}__exact'.format(db_obj.db_path)
41 value = db_obj.test_value
43 try:
44 self.Model.objects.filter(**{filter_name: value})
45 except Exception as e: # pragma: no cover
46 self.fail("Class {}'s field '{}' is not working: {}.".format(self.Model,
47 field_name,
48 str(e))) # pragma: no cover
51class BugTests(TestCase, UserFiltrableTestsMixin):
52 Model = Bug
55class IssueTests(TestCase, UserFiltrableTestsMixin):
56 Model = Issue
59class TestsuiteRunTests(TestCase, UserFiltrableTestsMixin):
60 Model = TestsuiteRun
63class TestResultTests(TestCase, UserFiltrableTestsMixin):
64 Model = TestResult
67class KnownFailureTests(TestCase, UserFiltrableTestsMixin):
68 Model = KnownFailure
71class QueryVisitorTests(TestCase):
72 def test_get_related_model(self):
73 queryVisitor = QueryVisitor(KnownFailure)
74 self.assertEqual(queryVisitor.get_related_model("result"), TestResult)
76 def test_get_related_model_no_attribute(self):
77 queryVisitor = QueryVisitor(KnownFailure)
78 with self.assertRaisesMessage(AttributeError, "'KnownFailure' has no attribute 'status'"):
79 queryVisitor.get_related_model("status")
82class QueryParserTests(TestCase):
83 class NestedModel:
84 class Object:
85 q_object = None
87 def filter(self, q_object):
88 QueryParserTests.NestedModel.Object.q_object = q_object
89 return Bug.objects.none()
91 objects = Object()
93 filter_objects_to_db = {
94 "nested1": FilterObjectInteger('nested_db__1'),
95 "nested2": FilterObjectStr('nested_db__2'),
96 }
98 def setUp(self):
99 self.filter_objects_to_db = {
100 "user_abc": FilterObjectInteger('db__abc'),
101 "user_def": FilterObjectStr('db__def'),
102 "user_ghi": FilterObjectDateTime('db__ghi'),
103 "user_jkl": FilterObjectDuration('db__jkl'),
104 "user_mno": FilterObjectBool('db__mno'),
105 "user_pqr": FilterObjectDateTime('db__pqr'),
106 "user_nes": FilterObjectModel(self.NestedModel, "db__nes"),
107 "user_json": FilterObjectJSON('db__json'),
108 }
110 def test_empty_query(self):
111 parser = QueryParser(self, "")
112 self.assertTrue(parser.is_valid, parser.error)
113 self.assertTrue(parser.is_empty)
114 self.assertEqual(parser.q_objects, Q())
115 self.assertEqual(parser.error, None)
117 def test_unknown_object_name(self):
118 parser = QueryParser(self, "hello = 'world'")
120 self.assertFalse(parser.is_valid, parser.error)
121 self.assertTrue(parser.is_empty)
122 self.assertEqual(parser.q_objects, Q())
123 self.assertEqual(parser.error, "The object 'hello' does not exist")
125 def test_key_with_double_underscore(self):
126 parser = QueryParser(self, "user_json.toto__tata = 'world'")
128 self.assertFalse(parser.is_valid, parser.error)
129 self.assertTrue(parser.is_empty)
130 self.assertEqual(parser.q_objects, Q())
131 self.assertEqual(parser.error, "Dict object keys cannot contain the substring '__'")
133 def test_two_keys_on_keyed_object(self):
134 parser = QueryParser(self, "user_json.toto.tata = 'world'")
136 self.assertFalse(parser.is_valid, parser.error)
137 self.assertTrue(parser.is_empty)
138 self.assertEqual(parser.q_objects, Q())
140 def test_no_key_on_keyed_object(self):
141 parser = QueryParser(self, "user_json = 'world'")
143 self.assertFalse(parser.is_valid, parser.error)
144 self.assertTrue(parser.is_empty)
145 self.assertEqual(parser.q_objects, Q())
146 self.assertEqual(parser.error, "The dict object 'user_json' requires a key to access its data")
148 def test_key_on_non_keyed_object(self):
149 parser = QueryParser(self, "user_pqr.toto = 'world'")
151 self.assertFalse(parser.is_valid, parser.error)
152 self.assertTrue(parser.is_empty)
153 self.assertEqual(parser.q_objects, Q())
154 self.assertEqual(parser.error, "The object 'user_pqr' cannot have an associated key")
156 def test_invalid_syntax(self):
157 parser = QueryParser(self, "hello = 'world")
159 self.assertFalse(parser.is_valid, parser.error)
160 self.assertTrue(parser.is_empty)
161 self.assertEqual(parser.q_objects, Q())
162 self.assertEqual(parser.error, "Expected ''' at position (1, 15) => 'o = 'world*'.")
164 @patch('django.utils.timezone.now',
165 return_value=datetime.datetime.strptime('2019-01-01T00:00:05', "%Y-%m-%dT%H:%M:%S"))
166 def test_parsing_all_types(self, now_mocked):
167 parser = QueryParser(self, "user_abc=123 AND user_def = 'HELLO' AND user_ghi =datetime(2019-02-01) "
168 "AND user_jkl = duration(00:00:03) AND user_jkl > ago(00:00:05) "
169 "AND user_mno = TRUE AND user_pqr = NONE AND user_json.foo_bar = 'bar'"
170 "AND user_json.foo_bar2 = 42")
172 self.assertTrue(parser.is_valid, parser.error)
173 self.assertFalse(parser.is_empty)
174 # HACK: Using 'children' attribute and set() here because the ordering was different, for some reason, between
175 # Q objects, which caused direct comparison to fail.
176 self.assertEqual(set(parser.q_objects.children),
177 set(Q(db__abc__exact=123, db__def__exact='HELLO', db__mno__exact=True,
178 db__ghi__exact=FilterObjectDateTime.parse_value('2019-02-01'),
179 db__jkl__exact=datetime.timedelta(seconds=3),
180 db__jkl__gt=datetime.datetime.strptime('2019-01-01', "%Y-%m-%d"),
181 db__pqr__exact=None, db__json__foo_bar__exact='bar',
182 db__json__foo_bar2__exact=42).children))
184 def test_integer_lookups(self):
185 for lookup, suffix in [('<=', 'lte'), ('>=', 'gte'), ('<', 'lt'), ('>', 'gt'), ('<', 'lt'), ('=', 'exact')]:
186 parser = QueryParser(self, "user_abc {} 1234".format(lookup))
187 key = "db__abc__{}".format(suffix)
188 self.assertEqual(parser.q_objects, Q(**{key: 1234}))
190 parser = QueryParser(self, "user_abc IS IN [12, 34]")
192 self.assertTrue(parser.is_valid, parser.error)
193 self.assertFalse(parser.is_empty)
194 self.assertEqual(parser.q_objects, Q(db__abc__in=[12, 34]))
196 def test_string_lookups(self):
197 for lookup, suffix, negated in [('CONTAINS', 'contains', False), ('ICONTAINS', 'icontains', False),
198 ('MATCHES', 'regex', False), ('~=', 'regex', False), ('=', 'exact', False),
199 ('!=', 'exact', True)]:
200 parser = QueryParser(self, "user_def {} 'hello'".format(lookup))
202 key = "db__def__{}".format(suffix)
203 expected = Q(**{key: "hello"})
205 if negated:
206 self.assertEqual(parser.q_objects, ~expected)
207 else:
208 self.assertEqual(parser.q_objects, expected)
210 parser = QueryParser(self, "user_def IS IN ['hello','world']")
211 self.assertEqual(parser.q_objects, Q(db__def__in=['hello', 'world']))
213 def test_empty_string_query(self):
214 parser = QueryParser(self, "user_def = ''")
215 key = "db__def__exact"
216 expected = Q(**{key: ""})
217 self.assertEqual(parser.q_objects, expected)
219 def test_escaped_string_query(self):
220 for quote in ["'", '"']:
221 query = f"user_def = {quote}foo\\{quote}bar{quote}"
222 parser = QueryParser(self, query)
223 self.assertTrue(parser.is_valid)
224 self.assertEqual(parser.error, None)
225 expected = Q(**{"db__def__exact": f"foo\\{quote}bar"})
226 self.assertEqual(parser.q_objects, expected)
228 def test_limit_alone(self):
229 parser = QueryParser(self, "user_abc=123 AND user_def = 'HELLO' LIMIT 42")
230 self.assertEqual(parser.limit, 42)
232 def test_limit_negative(self):
233 parser = QueryParser(self, "user_abc=123 AND user_def = 'HELLO' LIMIT -42")
235 self.assertFalse(parser.is_valid, )
236 self.assertEqual(parser.error, "Negative limits are not supported")
238 def test_orderby_alone(self):
239 parser = QueryParser(self, "user_abc=123 AND user_def = 'HELLO' ORDER_BY -user_abc")
240 self.assertEqual(parser.orderby, "-db__abc")
242 def test_orderby_invalid_object(self):
243 parser = QueryParser(self, "user_abc=123 AND user_def = 'HELLO' ORDER_BY toto")
245 self.assertFalse(parser.is_valid, )
246 self.assertEqual(parser.error, "The object 'toto' does not exist")
248 def test_orderby_limit_interaction(self):
249 parser = QueryParser(self, "user_abc=123 AND user_def = 'HELLO' ORDER_BY user_def LIMIT 42")
251 self.assertTrue(parser.is_valid, parser.error)
252 self.assertFalse(parser.is_empty)
253 self.assertEqual(parser.q_objects,
254 Q(db__abc__exact=123, db__def__exact='HELLO'))
255 self.assertEqual(parser.limit, 42)
256 self.assertEqual(parser.orderby, "db__def")
258 def test_invalid_subquery(self):
259 parser = QueryParser(self, "user_abc=123 AND user_nes MATCHES (user_abc=123) AND user_def = 'TOTO'")
261 self.assertFalse(parser.is_valid)
262 self.assertEqual(parser.error, "The object 'user_abc' does not exist")
263 self.assertTrue(parser.is_empty)
265 def test_subquery(self):
266 parser = QueryParser(self, "user_abc=123 AND "
267 "user_nes MATCHES (nested1=123 AND ((nested2 = 'hello') OR (nested2 = 'world'))) "
268 "AND user_def = 'TOTO'")
270 self.assertTrue(parser.is_valid, parser.error)
271 self.assertFalse(parser.is_empty)
273 self.assertEqual(self.NestedModel.Object.q_object,
274 Q(nested_db__1__exact=123) & (Q(nested_db__2__exact="hello") | Q(nested_db__2__exact="world")))
276 def test_complex_query1(self):
277 parser = QueryParser(self, '''(user_abc IS IN ["toto","titi"] AND user_def=datetime(2018-06-23)) OR
278 ((user_ghi > 456 AND NOT user_def ~= "hello" ) OR user_ghi < 456)''')
279 q_filter = (Q(**{'db__abc__in': ['toto', 'titi']}) & Q(**{'db__def__exact':
280 datetime.datetime(2018, 6, 23, 0, 0, tzinfo=pytz.utc)})) | ((Q(**{'db__ghi__gt': 456})
281 & ~Q(**{'db__def__regex': 'hello'})) |
282 Q(**{'db__ghi__lt': 456}))
283 self.assertTrue(parser.is_valid, parser.error)
284 self.assertFalse(parser.is_empty)
285 self.assertEqual(parser.q_objects, q_filter)
287 def test_complex_query2(self):
288 parser = QueryParser(self, '''(user_abc IS IN [2,3,4] OR user_abc NOT IN [2,3] )
289 AND (user_abc <= 1 OR user_abc >= 0)''')
290 q_filter = (Q(**{'db__abc__in': [2, 3, 4]}) | ~Q(**{'db__abc__in': [2, 3]}))\
291 & (Q(**{'db__abc__lte': 1}) | Q(**{'db__abc__gte': 0}))
293 self.assertTrue(parser.is_valid, parser.error)
294 self.assertFalse(parser.is_empty)
295 self.assertEqual(parser.q_objects, q_filter)
297 def test_ignore_fields__all_fields_ignored(self):
298 parser = QueryParser(TestResult, "status_name = 'fail'", ignore_fields=["status_name"])
299 self.assertEqual(
300 ('SELECT DISTINCT "CIResults_testresult"."id", "CIResults_testresult"."test_id", '
301 '"CIResults_testresult"."ts_run_id", "CIResults_testresult"."status_id", "CIResults_testresult"."url", '
302 '"CIResults_testresult"."start", "CIResults_testresult"."duration", "CIResults_testresult"."command", '
303 '"CIResults_testresult"."stdout", "CIResults_testresult"."stderr", "CIResults_testresult"."dmesg" FROM '
304 '"CIResults_testresult"'),
305 str(parser.objects.query)
306 )
308 def test_ignore_fields__complex_query_with_multiple_ignored_fields(self):
309 parser = QueryParser(TestResult, "status_name = 'fail' AND (stdout = 'out' OR stderr = 'err')",
310 ignore_fields=["status_name", "stderr"])
311 self.assertEqual(
312 ('SELECT DISTINCT "CIResults_testresult"."id", "CIResults_testresult"."test_id", '
313 '"CIResults_testresult"."ts_run_id", "CIResults_testresult"."status_id", "CIResults_testresult"."url", '
314 '"CIResults_testresult"."start", "CIResults_testresult"."duration", "CIResults_testresult"."command", '
315 '"CIResults_testresult"."stdout", "CIResults_testresult"."stderr", "CIResults_testresult"."dmesg" FROM '
316 '"CIResults_testresult" WHERE "CIResults_testresult"."stdout" = out'),
317 str(parser.objects.query)
318 )
321class LegacyParserTests(TestCase):
322 def setUp(self):
323 UserFiltrableMixin.filter_objects_to_db = {
324 "user_abc": FilterObjectStr('db__abc'),
325 "user_def": FilterObjectStr('db__def'),
326 "user_ghi": FilterObjectStr('db__ghi'),
327 "user_jkl": FilterObjectBool('db__jkl'),
328 "user_mno": FilterObjectDuration('db__mno'),
329 }
331 def test_no_filters(self):
332 parser = LegacyParser(UserFiltrableMixin)
333 self.assertEqual(parser.query, "")
335 def test_valid_filters(self):
336 parser = LegacyParser(UserFiltrableMixin,
337 only__user_abc__in=['toto', 'int(1234.3)'],
338 only__user_def__exact='datetime(2018-06-23)',
339 only__user_ghi__gt=['int(456)'],
340 only__user_jkl__exact='bool(1)',
341 only__user_mno__exact='duration(00:00:03)',
342 exclude__user_def__regex='str(hello)')
343 self.assertEqual(parser.query,
344 "user_abc IS IN ['toto', 1234.3] AND user_def = datetime(2018-06-23) AND user_ghi > 456 "
345 "AND user_jkl = TRUE AND user_mno = duration(00:00:03) AND NOT (user_def ~= 'hello')")
347 def test_regex_aggregation(self):
348 parser = LegacyParser(UserFiltrableMixin, only__user_abc__regex=['toto', 'tata', 'titi'])
349 self.assertEqual(parser.query, "user_abc ~= '(toto|tata|titi)'")
351 def test_invalid_formats(self):
352 parser = LegacyParser(UserFiltrableMixin, balbla='ghujfdk', oops__user_abc__in=12,
353 only__invalid__in=13, only__user_abc__toto=14)
354 self.assertEqual(parser.query, "")
357class UserFiltrableMixinTests(TestCase):
358 def test_old_style(self):
359 queryset = TestResult.from_user_filters(only__status_name__exact='pass').objects
360 self.assertIn('WHERE "CIResults_textstatus"."name" = pass', str(queryset.query))
362 def test_new_style(self):
363 queryset = TestResult.from_user_filters(query=['status_name = "pass"']).objects
364 self.assertIn('WHERE "CIResults_textstatus"."name" = pass', str(queryset.query))
366 def test_new_style_with_short_queries(self):
367 q = 'status_name = "toto"'
368 q2 = 'status_name = "tata"'
370 # Check that the shorthand versions resolve to the right query
371 short_query = Shortener.get_or_create(q)
372 query = TestResult.from_user_filters(query_key=short_query.shorthand)
373 self.assertEqual(query.user_query, q)
375 # Check that we prioritize full queries to shorthands
376 query = TestResult.from_user_filters(query=q2, query_key=short_query.shorthand)
377 self.assertEqual(query.user_query, q2)
379 def test_sub_queries(self):
380 parser = TestResult.from_user_filters(query=['machine_tag CONTAINS ["tag1", "tag2"]'])
381 sub_query = str(parser.q_objects.children[0][1].query)
382 self.assertEqual(parser.q_objects.children[0][0], 'ts_run__machine__in')
383 self.assertIn('"name" = tag1', sub_query)
384 self.assertIn('"name" = tag2', sub_query)
387class FilterObjectTests(TestCase):
388 def test_empty_description(self):
389 self.assertEqual(FilterObject("").description, "<no description yet>")
391 def test_with_description(self):
392 self.assertEqual(FilterObject("", "My description").description, "My description")
395class FilterObjectDurationTests(TestCase):
396 def test_invalid_value(self):
397 with self.assertRaisesRegex(ValueError, "The value '1 month' does not represent a duration"):
398 FilterObjectDuration.parse_value('1 month')
401class BuildQueryFromRequestTests(TestCase):
402 def setUp(self):
403 self.factory = RequestFactory()
405 def test_build_machine_query_from_request(self):
406 request = self.factory.get('/machine?name=name&description=description')
407 self.assertEqual(QueryCreator(request, Machine).multiple_request_params_to_query().user_query,
408 "name MATCHES 'name' AND description MATCHES 'description'")