Coverage for CIResults/filtering.py: 100%
526 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-19 09:20 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-19 09:20 +0000
1import operator
2import re
3import traceback
4from collections.abc import Callable
5from typing import Any
7import pytz
8from arpeggio import EOF, NoMatch, OneOrMore, Optional, ParserPython, PTNodeVisitor
9from arpeggio import RegExMatch as _
10from arpeggio import ZeroOrMore, visit_parse_tree
11from dateutil import parser as datetimeparser
12from django.contrib import messages
13from django.db.models import ForeignKey, ManyToManyField, Q
14from django.http.request import QueryDict
15from django.utils import timezone
16from django.utils.dateparse import parse_duration
17from django.utils.functional import cached_property
19from shortener.models import Shortener
22# Arpeggio's parser
23def val_none(): return _(r'NONE')
26def val_int(): return _(r'-?\d+')
29def val_str(): return [('"', _(r'([^"\\]|\\.)*'), '"'),
30 ("'", _(r"([^'\\]|\\.)*"), "'")]
33def val_bool(): return [r'TRUE', r'FALSE']
36def val_datetime(): return 'datetime(', [_(r'[^\)]+')], ')'
39def val_duration(): return 'duration(', [_(r'[^\)]+')], ')'
42def val_ago(): return 'ago(', [_(r'[^\)]+')], ')'
45def val_array(): return "[", OneOrMore(([val_none, val_str, val_int, val_bool, val_datetime, val_duration, val_ago],
46 Optional(','))), "]"
49def nested_expression(): return ZeroOrMore(ZeroOrMore(_(r'[^()]+')), ZeroOrMore("(", nested_expression, ")"),
50 ZeroOrMore(_(r'[^()]+')))
53def val_subquery(): return [('(', nested_expression, ')')]
56def filter_field(): return _(r'[a-zA-Z\d_-]+')
59def filter_object(): return _(r'\w+'), Optional(".", filter_field)
62def basic_filter(): return [(filter_object, ['IS IN', 'NOT IN'], val_array),
63 (filter_object, ['<=', r'<', r'>=', r'>'], [val_int, val_datetime,
64 val_duration, val_ago]),
65 (filter_object, ['=', '!='], [val_duration, val_datetime, val_int,
66 val_bool, val_str, val_none]),
67 (filter_object, [r'~=', r'MATCHES', r'ICONTAINS'], val_str),
68 (filter_object, [r'CONTAINS'], [val_str, val_array]),
69 (filter_object, [r'MATCHES'], val_subquery)]
72def orderby_object(): return _(r'-?\w+')
75def orderby(): return ("ORDER_BY", orderby_object)
78def limit(): return ("LIMIT", val_int)
81def factor(): return Optional("NOT"), [basic_filter, ("(", expression, ")")]
84def expression(): return factor, ZeroOrMore(["AND", "OR"], factor), Optional(orderby), Optional(limit)
87def query(): return Optional(expression), EOF
90class QueryVisitor(PTNodeVisitor):
91 class NoneObject:
92 pass
94 def __init__(self, model, ignore_fields=[], *arg, **kwargs):
95 """
96 Args:
97 ignore_fields (list): List of fields whose filter conditions will be ignored during parsing.
98 """
99 self.model = model
100 self.orderby = None
101 self.limit = None
102 self.ignore_db_paths = []
103 for field in ignore_fields:
104 if obj := self.model.filter_objects_to_db.get(field, {}):
105 self.ignore_db_paths.append(obj.db_path)
107 super().__init__(*arg, **kwargs)
109 def is_m2m(self, path: str) -> bool:
110 model = self.model
111 for field_name in path.split("__"):
112 if model._meta.get_field(field_name).__class__ is ManyToManyField:
113 return True
114 if model._meta.get_field(field_name).__class__ is ForeignKey:
115 model = getattr(model, field_name).field.remote_field.model
116 continue
117 break
118 return False
120 def visit_val_none(self, node, children):
121 # HACK: I would have rather returned None, but Arppegio interprets this as
122 # a <no match>... Instead, return a NoneObject that will later be converted
123 return QueryVisitor.NoneObject() # pragma: no cover
125 def visit_val_int(self, node, children):
126 return FilterObjectInteger.parse_value(node.value)
128 def visit_val_str(self, node, children):
129 if len(children) == 0:
130 return ""
131 if len(children) > 1:
132 raise ValueError("val_str cannot have more than one child") # pragma: no cover
133 return FilterObjectStr.parse_value(children[0])
135 def visit_val_bool(self, node, children):
136 return FilterObjectBool.parse_value(node.value)
138 def visit_val_datetime(self, node, children):
139 if len(children) > 1:
140 raise ValueError("val_datetime cannot have more than one child") # pragma: no cover
141 return FilterObjectDateTime.parse_value(children[0])
143 def visit_val_duration(self, node, children):
144 if len(children) > 1:
145 raise ValueError("val_duration cannot have more than one child") # pragma: no cover
146 return FilterObjectDuration.parse_value(children[0])
148 def visit_val_ago(self, node, children):
149 if len(children) > 1:
150 raise ValueError("val_ago cannot have more than one child") # pragma: no cover
151 duration = FilterObjectDuration.parse_value(children[0])
152 return timezone.now() - duration
154 def visit_filter_field(self, node, children):
155 if '__' in node.value:
156 raise ValueError("Dict object keys cannot contain the substring '__'")
158 return node.value
160 def visit_filter_object(self, node, children):
161 filter_obj = self.model.filter_objects_to_db.get(children[0])
162 if filter_obj is None:
163 raise ValueError("The object '{}' does not exist".format(children[0]))
165 if isinstance(filter_obj, FilterObjectJSON):
166 if len(children) != 2:
167 raise ValueError("The dict object '{}' requires a key to access its data".format(children[0]))
168 filter_obj = FilterObjectJSON(filter_obj._db_path, filter_obj.description, children[1])
169 elif len(children) != 1:
170 raise ValueError("The object '{}' cannot have an associated key".format(children[0]))
172 return filter_obj
174 def visit_val_array(self, node, children):
175 return [c for c in children if c != ',']
177 def visit_val_subquery(self, node, children):
178 out = ""
179 for x in list(node):
180 out += str(x.flat_str())
181 return out
183 def visit_basic_filter(self, node, children):
184 if len(children) != 3:
185 raise ValueError("basic_filter: Invalid amount of operands") # pragma: no cover
187 filter_obj, lookup, item = children
189 if filter_obj.db_path in self.ignore_db_paths:
190 return self.emit_empty()
192 # HACK: see visit_val_none()
193 if isinstance(item, QueryVisitor.NoneObject):
194 item = None # pragma: no cover
196 obj = None
197 if isinstance(filter_obj, FilterObjectModel):
198 item = filter_obj.parse_value(item)
199 obj = self.emit_is_in_operator(filter_obj, item)
200 else:
201 lookups_map = {
202 "<=": self.emit_lte_operator,
203 ">=": self.emit_gte_operator,
204 "<": self.emit_lt_operator,
205 ">": self.emit_gt_operator,
206 "CONTAINS": (
207 self.emit_contains_list_operator if isinstance(item, list) else self.emit_contains_string_operator
208 ),
209 "ICONTAINS": self.emit_icontains_operator,
210 "IS IN": self.emit_is_in_operator,
211 "NOT IN": self.emit_is_in_operator,
212 "MATCHES": self.emit_matches_operator,
213 "~=": self.emit_matches_operator,
214 "=": self.emit_equal_operator,
215 "!=": self.emit_equal_operator,
216 }
217 if lookup not in lookups_map:
218 raise ValueError("Unknown lookup '{}'".format(lookup)) # pragma: no cover
219 obj = lookups_map[lookup](filter_obj, item)
221 if lookup in ['!=', 'NOT IN']:
222 return self.emit_not_operator(obj)
223 else:
224 return obj
226 def visit_factor(self, node, children):
227 if len(children) > 1:
228 if children[0] == "NOT":
229 return self.emit_not_operator(children[-1])
230 return children[-1]
232 def visit_orderby_object(self, node, children):
233 reverse = node.value[0] == '-'
235 obj_name = node.value if not reverse else node.value[1:]
237 filter_obj = self.model.filter_objects_to_db.get(obj_name)
238 if filter_obj is not None:
239 return "{}{}".format("-" if reverse else "", filter_obj.db_path)
240 else:
241 raise ValueError("The object '{}' does not exist".format(obj_name))
243 def visit_orderby(self, node, children):
244 if len(children) == 1:
245 self.orderby = children[0]
246 else:
247 raise ValueError("orderby: Invalid amount of operands") # pragma: no cover
249 def visit_limit(self, node, children):
250 if len(children) == 1:
251 if children[0] < 0:
252 raise ValueError("Negative limits are not supported")
254 self.limit = children[0]
255 else:
256 raise ValueError("limit: Invalid amount of operands") # pragma: no cover
258 def visit_expression(self, node, children):
259 if len(children) >= 1:
260 result = children[0]
261 for i in range(2, len(children), 2):
262 if children[i-1] == "AND":
263 result = self.emit_and_operator(result, children[i])
264 elif children[i-1] == "OR":
265 result = self.emit_or_operator(result, children[i])
266 return result
268 def visit_query(self, node, children):
269 if len(children) > 1:
270 raise ValueError("query cannot have more than one child") # pragma: no cover
271 elif len(children) == 1:
272 return children[0]
273 else:
274 return self.emit_empty()
276 def emit_equal_operator(self, filter_obj, item):
277 raise NotImplementedError() # pragma: no cover
279 def emit_lte_operator(self, filter_obj, item):
280 raise NotImplementedError() # pragma: no cover
282 def emit_lt_operator(self, filter_obj, item):
283 raise NotImplementedError() # pragma: no cover
285 def emit_gte_operator(self, filter_obj, item):
286 raise NotImplementedError() # pragma: no cover
288 def emit_gt_operator(self, filter_obj, item):
289 raise NotImplementedError() # pragma: no cover
291 def emit_contains_string_operator(self, filter_obj, item):
292 raise NotImplementedError() # pragma: no cover
294 def emit_contains_list_operator(self, filter_obj, item):
295 raise NotImplementedError() # pragma: no cover
297 def emit_icontains_operator(self, filter_obj, item):
298 raise NotImplementedError() # pragma: no cover
300 def emit_is_in_operator(self, filter_obj, item):
301 raise NotImplementedError() # pragma: no cover
303 def emit_matches_operator(self, filter_obj, item):
304 raise NotImplementedError() # pragma: no cover
306 def emit_not_operator(self, x):
307 raise NotImplementedError() # pragma: no cover
309 def emit_and_operator(self, x, y):
310 raise NotImplementedError() # pragma: no cover
312 def emit_or_operator(self, x, y):
313 raise NotImplementedError() # pragma: no cover
315 def emit_empty(self):
316 raise NotImplementedError() # pragma: no cover
319class VisitorQ(QueryVisitor):
320 def __init__(self, model, ignore_fields=[], *arg, **kwargs):
321 super().__init__(model, ignore_fields=ignore_fields, *arg, **kwargs)
323 def get_related_model(self, path: str):
324 model = self.model
325 for field_name in filter(None, path.split("__")):
326 model = getattr(model, field_name).field.remote_field.model
327 return model
329 def process_compare_operator(self, filter_obj, item, condition_key):
330 key = filter_obj.db_path
331 # Intercept lookups on many-to-many fields to allow filtering by multiple values
332 # (e.g. machine_tag = "A" AND machine_tag = "B")
333 # NOTE: This is needed to make the Python and the Q parsers behave in the same way
334 if self.is_m2m(filter_obj.db_path):
335 # Key has to be split into two separate ones to filter by multiple nested values in relation
336 # "many-to-many" (example: ts_run__machine__tags__name):
337 # * "key": path to parent key that is queried over (based on example: ts_run__machine)
338 # * "subquery_key": relative path to child value key (based on example: tags__name)
339 key_parts = key.split("__")
340 key = "__".join(key_parts[:-2])
341 subquery_key = "__".join(key_parts[-2:]) + condition_key
342 objects = self.get_related_model(key).objects
343 if condition_key == "__exact" and isinstance(item, list):
344 for value in item:
345 objects = objects.filter(Q(**{f"{subquery_key}": value}))
346 else:
347 objects = objects.filter(Q(**{f"{subquery_key}": item}))
348 item = objects
349 key = key or "pk"
350 condition_key = "__in"
352 return Q(**{key + condition_key: item})
354 def emit_lte_operator(self, filter_obj, item):
355 return self.process_compare_operator(filter_obj, item, "__lte")
357 def emit_lt_operator(self, filter_obj, item):
358 return self.process_compare_operator(filter_obj, item, "__lt")
360 def emit_gte_operator(self, filter_obj, item):
361 return self.process_compare_operator(filter_obj, item, "__gte")
363 def emit_gt_operator(self, filter_obj, item):
364 return self.process_compare_operator(filter_obj, item, "__gt")
366 def emit_contains_string_operator(self, filter_obj, item):
367 return self.process_compare_operator(filter_obj, item, "__contains")
369 def emit_contains_list_operator(self, filter_obj, item):
370 return self.process_compare_operator(filter_obj, item, "__exact")
372 def emit_icontains_operator(self, filter_obj, item):
373 return self.process_compare_operator(filter_obj, item, "__icontains")
375 def emit_is_in_operator(self, filter_obj, item):
376 return self.process_compare_operator(filter_obj, item, "__in")
378 def emit_matches_operator(self, filter_obj, item):
379 return self.process_compare_operator(filter_obj, item, "__regex")
381 def emit_equal_operator(self, filter_obj, item):
382 return self.process_compare_operator(filter_obj, item, "__exact")
384 def emit_not_operator(self, x):
385 return ~x
387 def emit_and_operator(self, x, y):
388 return x & y
390 def emit_or_operator(self, x, y):
391 return x | y
393 def emit_empty(self):
394 return Q()
397def getnested(attr_path):
398 def nested_getter(obj):
399 for attr in attr_path.split('.'):
400 try:
401 obj = getattr(obj, attr)
402 except AttributeError:
403 obj = obj.get(attr)
404 if obj is None:
405 return None
406 return obj
407 return nested_getter
410def compose(f1, f2):
411 return lambda *args, **kwargs: f1(f2(*args, **kwargs))
414def create_and_op(f1, f2):
415 return lambda x: f1(x) and f2(x)
418def create_or_op(f1, f2):
419 return lambda x: f1(x) or f2(x)
422def function_compare_factory(fn, getter, value):
423 return lambda x: fn(getter(x), value)
426class VisitorLocal(QueryVisitor):
427 def __init__(self, model, ignore_fields=[], *arg, **kwargs):
428 super().__init__(model, ignore_fields=ignore_fields, *arg, **kwargs)
430 @staticmethod
431 def get_list_getter(field_name):
432 related_field_name = field_name.split(".")[-1]
433 field_name = ".".join(field_name.split(".")[:-1])
434 getter = getnested(field_name)
435 return lambda x: [getattr(value, related_field_name) for value in getter(x).all()]
437 def parse_field_name(self, filter_obj):
438 return filter_obj.db_path.replace("__", ".")
440 def emit_lte_operator(self, filter_obj, item):
441 return function_compare_factory(operator.le, getnested(self.parse_field_name(filter_obj)), item)
443 def emit_lt_operator(self, filter_obj, item):
444 return function_compare_factory(operator.lt, getnested(self.parse_field_name(filter_obj)), item)
446 def emit_gte_operator(self, filter_obj, item):
447 return function_compare_factory(operator.ge, getnested(self.parse_field_name(filter_obj)), item)
449 def emit_gt_operator(self, filter_obj, item):
450 return function_compare_factory(operator.gt, getnested(self.parse_field_name(filter_obj)), item)
452 def emit_contains_string_operator(self, filter_obj, item):
453 return function_compare_factory(operator.contains, getnested(self.parse_field_name(filter_obj)), item)
455 def emit_contains_list_operator(self, filter_obj, item):
456 return function_compare_factory(
457 lambda x, y: set(y).issubset(set(x)), self.get_list_getter(self.parse_field_name(filter_obj)), item
458 )
460 def emit_icontains_operator(self, filter_obj, item):
461 return function_compare_factory(
462 lambda x, y: y.lower() in x.lower(), getnested(self.parse_field_name(filter_obj)), item
463 )
465 def emit_is_in_operator(self, filter_obj, item):
466 if self.is_m2m(filter_obj.db_path):
467 return function_compare_factory(
468 lambda x, y: any([val in x for val in y]), self.get_list_getter(self.parse_field_name(filter_obj)), item
469 )
470 return function_compare_factory(lambda x, y: x in y, getnested(self.parse_field_name(filter_obj)), item)
472 def emit_matches_operator(self, filter_obj, item):
473 return function_compare_factory(
474 lambda x, y: bool(re.search(y, x)), getnested(self.parse_field_name(filter_obj)), item
475 )
477 def emit_equal_operator(self, filter_obj, item):
478 if self.is_m2m(filter_obj.db_path):
479 return function_compare_factory(
480 operator.contains, self.get_list_getter(self.parse_field_name(filter_obj)), item
481 )
482 return function_compare_factory(operator.eq, getnested(self.parse_field_name(filter_obj)), item)
484 def emit_not_operator(self, x):
485 return compose(operator.not_, x)
487 def emit_and_operator(self, x, y):
488 return create_and_op(x, y)
490 def emit_or_operator(self, x, y):
491 return create_or_op(x, y)
493 def emit_empty(self):
494 return lambda _: True
497class QueryParserPython:
498 def __init__(self, model, user_query, ignore_fields: list[str] = []):
499 self.model = model
500 self.user_query = user_query
501 self.error = None
502 self.matching_fn: Callable[[Any], bool]
504 try:
505 parser = ParserPython(query)
506 parse_tree = parser.parse(user_query)
507 query_visitor = VisitorLocal(model, ignore_fields=ignore_fields)
508 self.matching_fn = visit_parse_tree(parse_tree, query_visitor)
509 except (ValueError, NoMatch) as err:
510 self.error = str(err)
512 @property
513 def is_valid(self):
514 return self.error is None
517class QueryParser:
518 def __init__(self, model, user_query, ignore_fields: list[str] = []):
519 self.model = model
520 self.user_query = user_query
522 self.error = None
523 self.q_objects = Q()
524 self.orderby = None
525 self.limit = None
527 try:
528 parser = ParserPython(query)
529 parse_tree = parser.parse(self.user_query)
530 query_visitor = VisitorQ(self.model, ignore_fields=ignore_fields)
532 self.q_objects = visit_parse_tree(parse_tree, query_visitor)
533 self.orderby = query_visitor.orderby
534 self.limit = query_visitor.limit
535 except ValueError as e:
536 self.error = str(e)
537 except NoMatch as e:
538 self.error = str(e)
540 @property
541 def query_key(self):
542 return Shortener.get_or_create(self.user_query).shorthand
544 @property
545 def is_valid(self):
546 return self.error is None
548 @property
549 def is_empty(self):
550 return not self.is_valid or len(self.user_query) == 0
552 @cached_property
553 def objects(self):
554 if self.is_valid:
555 query = self.model.objects.filter(self.q_objects).distinct()
556 query = query.order_by(self.orderby) if self.orderby is not None else query
557 return query[:self.limit] if self.limit is not None else query
558 else:
559 return self.model.objects.none()
562class LegacyParser:
563 userfilters_allowed_lookups = {'exact': '=', 'in': 'IS IN', 'regex': '~=', 'contains': 'CONTAINS',
564 'icontains': 'ICONTAINS', 'gt': '>', 'gte': '>=', 'lt': '<', 'lte': '<='}
565 userfilters_allowed_types = ['str', 'int', 'bool', 'datetime', 'duration']
567 def __init__(self, model, **user_filters):
568 # Filters should all be of the following format:
569 # (only|exclude)__(object)__(in|regex|gt|lt) = str or format(value)
570 lookups = "|".join(self.userfilters_allowed_lookups.keys())
571 format_re = re.compile((r'(?P<action>(only|exclude))__(?P<object>\w+)__'
572 '(?P<lookup>({lookups}))'.format(lookups=lookups)))
574 # Iterate through the user filters, match them to our format regex,
575 # then construct the right ORM call
576 only = []
577 exclude = []
578 for key, item in user_filters.items():
579 match = format_re.match(key)
580 if match:
581 fields = match.groupdict()
583 db_object = model.filter_objects_to_db.get(fields['object'])
584 if db_object is None:
585 continue
587 # aggregate all regular expressions into one request
588 if fields['lookup'] == 'regex' and isinstance(item, list) and len(item) > 1:
589 item = r'('+'|'.join(item)+')'
591 # Try converting the item to the right unit
592 item = self._convert_user_values(item)
594 bfilter = "{} {} {}".format(fields['object'],
595 self.userfilters_allowed_lookups.get(fields['lookup']),
596 item)
598 if fields['action'] == 'only':
599 only.append(bfilter)
600 else:
601 exclude.append(bfilter)
603 self.query = " AND ".join(only)
604 if len(exclude) > 0:
605 if len(only) > 0:
606 self.query += ' AND '
607 self.query += "NOT ({})".format(" AND ".join(exclude))
609 @classmethod
610 def _convert_user_value(cls, value):
611 # Will automatically be cached by python
612 types = "|".join(cls.userfilters_allowed_types)
613 item_re = re.compile(r'(?P<type>({types}))\((?P<value>.*)\)'.format(types=types))
615 match = item_re.match(value)
616 if match:
617 fields = match.groupdict()
619 try:
620 if fields['type'] == 'str':
621 return "'{}'".format(fields['value'])
622 elif fields['type'] == 'bool':
623 return "TRUE" if FilterObjectBool.parse_value(fields['value']) else "FALSE"
624 elif fields['type'] == 'int':
625 return fields['value']
626 elif fields['type'] == 'datetime' or fields['type'] == 'duration':
627 return value
628 except Exception: # pragma: no cover
629 traceback.print_exc() # pragma: no cover
631 # Default to the variable being a string
632 return "'" + value + "'"
634 @classmethod
635 def _convert_user_values(cls, items):
636 # detect whether we have a singular value or a list
637 if isinstance(items, list):
638 if len(items) > 1:
639 new = []
640 for item in items:
641 new.append(cls._convert_user_value(item))
642 return "[" + ", ".join(new) + "]"
643 else:
644 return cls._convert_user_value(items[0])
645 else:
646 return cls._convert_user_value(items)
649class UserFiltrableMixin:
650 @classmethod
651 def _get_value_from_params(cls, user_filters, key):
652 val = user_filters.get(key)
653 if isinstance(val, list) and len(val) == 1:
654 val = val[0]
655 return val
657 @classmethod
658 def from_user_filters(cls, prefix=None, **user_filters):
659 query_param_name = f'{prefix}_query' if prefix is not None else 'query'
660 query = cls._get_value_from_params(user_filters, query_param_name)
661 if query is None:
662 query_key = cls._get_value_from_params(user_filters, f'{query_param_name}_key')
663 short = Shortener.objects.filter(shorthand=query_key).first()
664 if short is not None:
665 query = short.full
667 if query is not None:
668 return QueryParser(cls, query)
669 else:
670 query = LegacyParser(cls, **user_filters).query
671 return QueryParser(cls, query)
674class FilterObject:
675 def __init__(self, db_path, description=None):
676 self._db_path = db_path
677 self._description = description
679 @property
680 def db_path(self):
681 return self._db_path
683 @property
684 def description(self):
685 if self._description is None:
686 return "<no description yet>"
687 else:
688 return self._description
691class FilterObjectJSON(FilterObject):
692 data_type = "anything"
693 documentation = "Expected format: <JSON field>.<key>"
694 test_value = "test"
696 def __init__(self, db_path, description=None, key=None):
697 self.key = key
698 super().__init__(db_path, description)
700 @property
701 def db_path(self):
702 if self.key is None:
703 raise ValueError("Dict field require a key to be accessed") # pragma: no cover
704 return "{}__{}".format(self._db_path, self.key)
707class FilterObjectStr(FilterObject):
708 data_type = "string"
709 documentation = "Expected format: anything. Use quotes for the new query language (\"\" or ''). " \
710 "Escape quotes by placing '\\' before quote character."
711 test_value = "str_test"
713 def __init__(self, db_path, description=None):
714 super().__init__(db_path, description)
716 @classmethod
717 def parse_value(cls, value):
718 return str(value)
721class FilterObjectDateTime(FilterObject):
722 data_type = "datetime"
723 documentation = "Expected format: datetime(YYYY-MM-DD HH:MM[:ss[.uuuuuu]][TZ])"
724 test_value = "2019-01-01"
726 def __init__(self, db_path, description=None):
727 super().__init__(db_path, description)
729 @classmethod
730 def parse_value(cls, value):
731 return timezone.make_aware(datetimeparser.parse(value), pytz.utc)
734class FilterObjectDuration(FilterObject):
735 data_type = "duration"
736 documentation = 'Expected format: "duration(DD HH:MM:SS.uuuuuu)", or "duration(P4DT1H15M20S)" (ISO 8601), ' \
737 'or "duration(3 days 04:05:06)" (PostgreSQL).'
738 test_value = "123.456 seconds"
740 def __init__(self, db_path, description=None):
741 super().__init__(db_path, description)
743 @classmethod
744 def parse_value(cls, value):
745 duration = parse_duration(value)
746 if duration is None:
747 raise ValueError("The value '{}' does not represent a duration. {}".format(value, cls.documentation))
748 return duration
751class FilterObjectBool(FilterObject):
752 data_type = "boolean"
753 documentation = "Supported values: bool(false)/bool(0) or bool(true)/bool(1). " \
754 "Use TRUE or FALSE for the new query language."
755 test_value = "True"
757 def __init__(self, db_path, description=None):
758 super().__init__(db_path, description)
760 @classmethod
761 def parse_value(cls, value):
762 return str(value).lower() in ["1", "true"]
765class FilterObjectInteger(FilterObject):
766 data_type = "integer"
767 documentation = "Supported values: int(12345). Use 12345 for the new query language."
768 test_value = 12345
770 def __init__(self, db_path, description=None):
771 super().__init__(db_path, description)
773 @classmethod
774 def parse_value(cls, value):
775 return int(float(value))
778class FilterObjectModel(FilterObject):
779 data_type = "subquery"
780 documentation = "Expected format: Any query compatible with the model selected"
782 def __init__(self, model, db_path, description=None):
783 self.model = model
784 super().__init__(db_path, description)
786 def parse_value(self, value):
787 result = QueryParser(self.model, value)
788 if not result.is_valid:
789 raise ValueError(result.error)
790 return result.objects
793class QueryCreator:
794 def __init__(self, request, Model, prefix=None, default_query_parameters={}):
795 self.request = request
796 self.Model = Model
797 self.prefix = prefix
798 self.default_query_parameters = default_query_parameters
800 def __create_query_from_filters(self, **requested_filters):
801 query = self.Model.from_user_filters(self.prefix, **requested_filters)
802 if len(query.user_query) > 0:
803 if not query.is_valid and self.request:
804 messages.error(self.request, "Filtering error: " + query.error)
805 return query
806 return None
808 def __build_query_string(self):
809 op_mappings = {
810 'string': 'MATCHES',
811 'datetime': 'MATCHES',
812 'integer': 'MATCHES',
813 }
814 query_str = ""
815 for obj in self.Model.filter_objects_to_db:
816 param_value = self.request.GET.get(f'{self.prefix}_{obj}' if self.prefix else obj)
817 if param_value:
818 data_type = self.Model.filter_objects_to_db[obj].data_type
819 if len(query_str) > 0:
820 query_str += " AND "
821 query_str += f"{obj} {op_mappings[data_type]} '{param_value}'"
822 return query_str
824 def string_to_query(self, query_string):
825 query_param_name = f'{self.prefix}_query' if self.prefix else 'query'
826 query_dict = QueryDict('', mutable=True)
827 query_dict.update({f'{query_param_name}': query_string})
828 query = self.__create_query_from_filters(**query_dict)
829 if query:
830 return query
831 return self.Model.from_user_filters(**self.default_query_parameters)
833 def request_to_query(self):
834 for params in [self.request.POST, self.request.GET]:
835 # convert the user filters to a normal dictionary to prevent issues when
836 # inserting new values
837 requested_filters = params.copy()
838 query = self.__create_query_from_filters(**requested_filters)
839 if query:
840 return query
842 return self.Model.from_user_filters(**self.default_query_parameters)
844 def multiple_request_params_to_query(self):
845 query_str = self.__build_query_string()
846 return self.string_to_query(query_str)