Coverage for CIResults/filtering.py: 100%

526 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-19 09:20 +0000

1import operator 

2import re 

3import traceback 

4from collections.abc import Callable 

5from typing import Any 

6 

7import pytz 

8from arpeggio import EOF, NoMatch, OneOrMore, Optional, ParserPython, PTNodeVisitor 

9from arpeggio import RegExMatch as _ 

10from arpeggio import ZeroOrMore, visit_parse_tree 

11from dateutil import parser as datetimeparser 

12from django.contrib import messages 

13from django.db.models import ForeignKey, ManyToManyField, Q 

14from django.http.request import QueryDict 

15from django.utils import timezone 

16from django.utils.dateparse import parse_duration 

17from django.utils.functional import cached_property 

18 

19from shortener.models import Shortener 

20 

21 

22# Arpeggio's parser 

23def val_none(): return _(r'NONE') 

24 

25 

26def val_int(): return _(r'-?\d+') 

27 

28 

29def val_str(): return [('"', _(r'([^"\\]|\\.)*'), '"'), 

30 ("'", _(r"([^'\\]|\\.)*"), "'")] 

31 

32 

33def val_bool(): return [r'TRUE', r'FALSE'] 

34 

35 

36def val_datetime(): return 'datetime(', [_(r'[^\)]+')], ')' 

37 

38 

39def val_duration(): return 'duration(', [_(r'[^\)]+')], ')' 

40 

41 

42def val_ago(): return 'ago(', [_(r'[^\)]+')], ')' 

43 

44 

45def val_array(): return "[", OneOrMore(([val_none, val_str, val_int, val_bool, val_datetime, val_duration, val_ago], 

46 Optional(','))), "]" 

47 

48 

49def nested_expression(): return ZeroOrMore(ZeroOrMore(_(r'[^()]+')), ZeroOrMore("(", nested_expression, ")"), 

50 ZeroOrMore(_(r'[^()]+'))) 

51 

52 

53def val_subquery(): return [('(', nested_expression, ')')] 

54 

55 

56def filter_field(): return _(r'[a-zA-Z\d_-]+') 

57 

58 

59def filter_object(): return _(r'\w+'), Optional(".", filter_field) 

60 

61 

62def basic_filter(): return [(filter_object, ['IS IN', 'NOT IN'], val_array), 

63 (filter_object, ['<=', r'<', r'>=', r'>'], [val_int, val_datetime, 

64 val_duration, val_ago]), 

65 (filter_object, ['=', '!='], [val_duration, val_datetime, val_int, 

66 val_bool, val_str, val_none]), 

67 (filter_object, [r'~=', r'MATCHES', r'ICONTAINS'], val_str), 

68 (filter_object, [r'CONTAINS'], [val_str, val_array]), 

69 (filter_object, [r'MATCHES'], val_subquery)] 

70 

71 

72def orderby_object(): return _(r'-?\w+') 

73 

74 

75def orderby(): return ("ORDER_BY", orderby_object) 

76 

77 

78def limit(): return ("LIMIT", val_int) 

79 

80 

81def factor(): return Optional("NOT"), [basic_filter, ("(", expression, ")")] 

82 

83 

84def expression(): return factor, ZeroOrMore(["AND", "OR"], factor), Optional(orderby), Optional(limit) 

85 

86 

87def query(): return Optional(expression), EOF 

88 

89 

90class QueryVisitor(PTNodeVisitor): 

91 class NoneObject: 

92 pass 

93 

94 def __init__(self, model, ignore_fields=[], *arg, **kwargs): 

95 """ 

96 Args: 

97 ignore_fields (list): List of fields whose filter conditions will be ignored during parsing. 

98 """ 

99 self.model = model 

100 self.orderby = None 

101 self.limit = None 

102 self.ignore_db_paths = [] 

103 for field in ignore_fields: 

104 if obj := self.model.filter_objects_to_db.get(field, {}): 

105 self.ignore_db_paths.append(obj.db_path) 

106 

107 super().__init__(*arg, **kwargs) 

108 

109 def is_m2m(self, path: str) -> bool: 

110 model = self.model 

111 for field_name in path.split("__"): 

112 if model._meta.get_field(field_name).__class__ is ManyToManyField: 

113 return True 

114 if model._meta.get_field(field_name).__class__ is ForeignKey: 

115 model = getattr(model, field_name).field.remote_field.model 

116 continue 

117 break 

118 return False 

119 

120 def visit_val_none(self, node, children): 

121 # HACK: I would have rather returned None, but Arppegio interprets this as 

122 # a <no match>... Instead, return a NoneObject that will later be converted 

123 return QueryVisitor.NoneObject() # pragma: no cover 

124 

125 def visit_val_int(self, node, children): 

126 return FilterObjectInteger.parse_value(node.value) 

127 

128 def visit_val_str(self, node, children): 

129 if len(children) == 0: 

130 return "" 

131 if len(children) > 1: 

132 raise ValueError("val_str cannot have more than one child") # pragma: no cover 

133 return FilterObjectStr.parse_value(children[0]) 

134 

135 def visit_val_bool(self, node, children): 

136 return FilterObjectBool.parse_value(node.value) 

137 

138 def visit_val_datetime(self, node, children): 

139 if len(children) > 1: 

140 raise ValueError("val_datetime cannot have more than one child") # pragma: no cover 

141 return FilterObjectDateTime.parse_value(children[0]) 

142 

143 def visit_val_duration(self, node, children): 

144 if len(children) > 1: 

145 raise ValueError("val_duration cannot have more than one child") # pragma: no cover 

146 return FilterObjectDuration.parse_value(children[0]) 

147 

148 def visit_val_ago(self, node, children): 

149 if len(children) > 1: 

150 raise ValueError("val_ago cannot have more than one child") # pragma: no cover 

151 duration = FilterObjectDuration.parse_value(children[0]) 

152 return timezone.now() - duration 

153 

154 def visit_filter_field(self, node, children): 

155 if '__' in node.value: 

156 raise ValueError("Dict object keys cannot contain the substring '__'") 

157 

158 return node.value 

159 

160 def visit_filter_object(self, node, children): 

161 filter_obj = self.model.filter_objects_to_db.get(children[0]) 

162 if filter_obj is None: 

163 raise ValueError("The object '{}' does not exist".format(children[0])) 

164 

165 if isinstance(filter_obj, FilterObjectJSON): 

166 if len(children) != 2: 

167 raise ValueError("The dict object '{}' requires a key to access its data".format(children[0])) 

168 filter_obj = FilterObjectJSON(filter_obj._db_path, filter_obj.description, children[1]) 

169 elif len(children) != 1: 

170 raise ValueError("The object '{}' cannot have an associated key".format(children[0])) 

171 

172 return filter_obj 

173 

174 def visit_val_array(self, node, children): 

175 return [c for c in children if c != ','] 

176 

177 def visit_val_subquery(self, node, children): 

178 out = "" 

179 for x in list(node): 

180 out += str(x.flat_str()) 

181 return out 

182 

183 def visit_basic_filter(self, node, children): 

184 if len(children) != 3: 

185 raise ValueError("basic_filter: Invalid amount of operands") # pragma: no cover 

186 

187 filter_obj, lookup, item = children 

188 

189 if filter_obj.db_path in self.ignore_db_paths: 

190 return self.emit_empty() 

191 

192 # HACK: see visit_val_none() 

193 if isinstance(item, QueryVisitor.NoneObject): 

194 item = None # pragma: no cover 

195 

196 obj = None 

197 if isinstance(filter_obj, FilterObjectModel): 

198 item = filter_obj.parse_value(item) 

199 obj = self.emit_is_in_operator(filter_obj, item) 

200 else: 

201 lookups_map = { 

202 "<=": self.emit_lte_operator, 

203 ">=": self.emit_gte_operator, 

204 "<": self.emit_lt_operator, 

205 ">": self.emit_gt_operator, 

206 "CONTAINS": ( 

207 self.emit_contains_list_operator if isinstance(item, list) else self.emit_contains_string_operator 

208 ), 

209 "ICONTAINS": self.emit_icontains_operator, 

210 "IS IN": self.emit_is_in_operator, 

211 "NOT IN": self.emit_is_in_operator, 

212 "MATCHES": self.emit_matches_operator, 

213 "~=": self.emit_matches_operator, 

214 "=": self.emit_equal_operator, 

215 "!=": self.emit_equal_operator, 

216 } 

217 if lookup not in lookups_map: 

218 raise ValueError("Unknown lookup '{}'".format(lookup)) # pragma: no cover 

219 obj = lookups_map[lookup](filter_obj, item) 

220 

221 if lookup in ['!=', 'NOT IN']: 

222 return self.emit_not_operator(obj) 

223 else: 

224 return obj 

225 

226 def visit_factor(self, node, children): 

227 if len(children) > 1: 

228 if children[0] == "NOT": 

229 return self.emit_not_operator(children[-1]) 

230 return children[-1] 

231 

232 def visit_orderby_object(self, node, children): 

233 reverse = node.value[0] == '-' 

234 

235 obj_name = node.value if not reverse else node.value[1:] 

236 

237 filter_obj = self.model.filter_objects_to_db.get(obj_name) 

238 if filter_obj is not None: 

239 return "{}{}".format("-" if reverse else "", filter_obj.db_path) 

240 else: 

241 raise ValueError("The object '{}' does not exist".format(obj_name)) 

242 

243 def visit_orderby(self, node, children): 

244 if len(children) == 1: 

245 self.orderby = children[0] 

246 else: 

247 raise ValueError("orderby: Invalid amount of operands") # pragma: no cover 

248 

249 def visit_limit(self, node, children): 

250 if len(children) == 1: 

251 if children[0] < 0: 

252 raise ValueError("Negative limits are not supported") 

253 

254 self.limit = children[0] 

255 else: 

256 raise ValueError("limit: Invalid amount of operands") # pragma: no cover 

257 

258 def visit_expression(self, node, children): 

259 if len(children) >= 1: 

260 result = children[0] 

261 for i in range(2, len(children), 2): 

262 if children[i-1] == "AND": 

263 result = self.emit_and_operator(result, children[i]) 

264 elif children[i-1] == "OR": 

265 result = self.emit_or_operator(result, children[i]) 

266 return result 

267 

268 def visit_query(self, node, children): 

269 if len(children) > 1: 

270 raise ValueError("query cannot have more than one child") # pragma: no cover 

271 elif len(children) == 1: 

272 return children[0] 

273 else: 

274 return self.emit_empty() 

275 

276 def emit_equal_operator(self, filter_obj, item): 

277 raise NotImplementedError() # pragma: no cover 

278 

279 def emit_lte_operator(self, filter_obj, item): 

280 raise NotImplementedError() # pragma: no cover 

281 

282 def emit_lt_operator(self, filter_obj, item): 

283 raise NotImplementedError() # pragma: no cover 

284 

285 def emit_gte_operator(self, filter_obj, item): 

286 raise NotImplementedError() # pragma: no cover 

287 

288 def emit_gt_operator(self, filter_obj, item): 

289 raise NotImplementedError() # pragma: no cover 

290 

291 def emit_contains_string_operator(self, filter_obj, item): 

292 raise NotImplementedError() # pragma: no cover 

293 

294 def emit_contains_list_operator(self, filter_obj, item): 

295 raise NotImplementedError() # pragma: no cover 

296 

297 def emit_icontains_operator(self, filter_obj, item): 

298 raise NotImplementedError() # pragma: no cover 

299 

300 def emit_is_in_operator(self, filter_obj, item): 

301 raise NotImplementedError() # pragma: no cover 

302 

303 def emit_matches_operator(self, filter_obj, item): 

304 raise NotImplementedError() # pragma: no cover 

305 

306 def emit_not_operator(self, x): 

307 raise NotImplementedError() # pragma: no cover 

308 

309 def emit_and_operator(self, x, y): 

310 raise NotImplementedError() # pragma: no cover 

311 

312 def emit_or_operator(self, x, y): 

313 raise NotImplementedError() # pragma: no cover 

314 

315 def emit_empty(self): 

316 raise NotImplementedError() # pragma: no cover 

317 

318 

319class VisitorQ(QueryVisitor): 

320 def __init__(self, model, ignore_fields=[], *arg, **kwargs): 

321 super().__init__(model, ignore_fields=ignore_fields, *arg, **kwargs) 

322 

323 def get_related_model(self, path: str): 

324 model = self.model 

325 for field_name in filter(None, path.split("__")): 

326 model = getattr(model, field_name).field.remote_field.model 

327 return model 

328 

329 def process_compare_operator(self, filter_obj, item, condition_key): 

330 key = filter_obj.db_path 

331 # Intercept lookups on many-to-many fields to allow filtering by multiple values 

332 # (e.g. machine_tag = "A" AND machine_tag = "B") 

333 # NOTE: This is needed to make the Python and the Q parsers behave in the same way 

334 if self.is_m2m(filter_obj.db_path): 

335 # Key has to be split into two separate ones to filter by multiple nested values in relation 

336 # "many-to-many" (example: ts_run__machine__tags__name): 

337 # * "key": path to parent key that is queried over (based on example: ts_run__machine) 

338 # * "subquery_key": relative path to child value key (based on example: tags__name) 

339 key_parts = key.split("__") 

340 key = "__".join(key_parts[:-2]) 

341 subquery_key = "__".join(key_parts[-2:]) + condition_key 

342 objects = self.get_related_model(key).objects 

343 if condition_key == "__exact" and isinstance(item, list): 

344 for value in item: 

345 objects = objects.filter(Q(**{f"{subquery_key}": value})) 

346 else: 

347 objects = objects.filter(Q(**{f"{subquery_key}": item})) 

348 item = objects 

349 key = key or "pk" 

350 condition_key = "__in" 

351 

352 return Q(**{key + condition_key: item}) 

353 

354 def emit_lte_operator(self, filter_obj, item): 

355 return self.process_compare_operator(filter_obj, item, "__lte") 

356 

357 def emit_lt_operator(self, filter_obj, item): 

358 return self.process_compare_operator(filter_obj, item, "__lt") 

359 

360 def emit_gte_operator(self, filter_obj, item): 

361 return self.process_compare_operator(filter_obj, item, "__gte") 

362 

363 def emit_gt_operator(self, filter_obj, item): 

364 return self.process_compare_operator(filter_obj, item, "__gt") 

365 

366 def emit_contains_string_operator(self, filter_obj, item): 

367 return self.process_compare_operator(filter_obj, item, "__contains") 

368 

369 def emit_contains_list_operator(self, filter_obj, item): 

370 return self.process_compare_operator(filter_obj, item, "__exact") 

371 

372 def emit_icontains_operator(self, filter_obj, item): 

373 return self.process_compare_operator(filter_obj, item, "__icontains") 

374 

375 def emit_is_in_operator(self, filter_obj, item): 

376 return self.process_compare_operator(filter_obj, item, "__in") 

377 

378 def emit_matches_operator(self, filter_obj, item): 

379 return self.process_compare_operator(filter_obj, item, "__regex") 

380 

381 def emit_equal_operator(self, filter_obj, item): 

382 return self.process_compare_operator(filter_obj, item, "__exact") 

383 

384 def emit_not_operator(self, x): 

385 return ~x 

386 

387 def emit_and_operator(self, x, y): 

388 return x & y 

389 

390 def emit_or_operator(self, x, y): 

391 return x | y 

392 

393 def emit_empty(self): 

394 return Q() 

395 

396 

397def getnested(attr_path): 

398 def nested_getter(obj): 

399 for attr in attr_path.split('.'): 

400 try: 

401 obj = getattr(obj, attr) 

402 except AttributeError: 

403 obj = obj.get(attr) 

404 if obj is None: 

405 return None 

406 return obj 

407 return nested_getter 

408 

409 

410def compose(f1, f2): 

411 return lambda *args, **kwargs: f1(f2(*args, **kwargs)) 

412 

413 

414def create_and_op(f1, f2): 

415 return lambda x: f1(x) and f2(x) 

416 

417 

418def create_or_op(f1, f2): 

419 return lambda x: f1(x) or f2(x) 

420 

421 

422def function_compare_factory(fn, getter, value): 

423 return lambda x: fn(getter(x), value) 

424 

425 

426class VisitorLocal(QueryVisitor): 

427 def __init__(self, model, ignore_fields=[], *arg, **kwargs): 

428 super().__init__(model, ignore_fields=ignore_fields, *arg, **kwargs) 

429 

430 @staticmethod 

431 def get_list_getter(field_name): 

432 related_field_name = field_name.split(".")[-1] 

433 field_name = ".".join(field_name.split(".")[:-1]) 

434 getter = getnested(field_name) 

435 return lambda x: [getattr(value, related_field_name) for value in getter(x).all()] 

436 

437 def parse_field_name(self, filter_obj): 

438 return filter_obj.db_path.replace("__", ".") 

439 

440 def emit_lte_operator(self, filter_obj, item): 

441 return function_compare_factory(operator.le, getnested(self.parse_field_name(filter_obj)), item) 

442 

443 def emit_lt_operator(self, filter_obj, item): 

444 return function_compare_factory(operator.lt, getnested(self.parse_field_name(filter_obj)), item) 

445 

446 def emit_gte_operator(self, filter_obj, item): 

447 return function_compare_factory(operator.ge, getnested(self.parse_field_name(filter_obj)), item) 

448 

449 def emit_gt_operator(self, filter_obj, item): 

450 return function_compare_factory(operator.gt, getnested(self.parse_field_name(filter_obj)), item) 

451 

452 def emit_contains_string_operator(self, filter_obj, item): 

453 return function_compare_factory(operator.contains, getnested(self.parse_field_name(filter_obj)), item) 

454 

455 def emit_contains_list_operator(self, filter_obj, item): 

456 return function_compare_factory( 

457 lambda x, y: set(y).issubset(set(x)), self.get_list_getter(self.parse_field_name(filter_obj)), item 

458 ) 

459 

460 def emit_icontains_operator(self, filter_obj, item): 

461 return function_compare_factory( 

462 lambda x, y: y.lower() in x.lower(), getnested(self.parse_field_name(filter_obj)), item 

463 ) 

464 

465 def emit_is_in_operator(self, filter_obj, item): 

466 if self.is_m2m(filter_obj.db_path): 

467 return function_compare_factory( 

468 lambda x, y: any([val in x for val in y]), self.get_list_getter(self.parse_field_name(filter_obj)), item 

469 ) 

470 return function_compare_factory(lambda x, y: x in y, getnested(self.parse_field_name(filter_obj)), item) 

471 

472 def emit_matches_operator(self, filter_obj, item): 

473 return function_compare_factory( 

474 lambda x, y: bool(re.search(y, x)), getnested(self.parse_field_name(filter_obj)), item 

475 ) 

476 

477 def emit_equal_operator(self, filter_obj, item): 

478 if self.is_m2m(filter_obj.db_path): 

479 return function_compare_factory( 

480 operator.contains, self.get_list_getter(self.parse_field_name(filter_obj)), item 

481 ) 

482 return function_compare_factory(operator.eq, getnested(self.parse_field_name(filter_obj)), item) 

483 

484 def emit_not_operator(self, x): 

485 return compose(operator.not_, x) 

486 

487 def emit_and_operator(self, x, y): 

488 return create_and_op(x, y) 

489 

490 def emit_or_operator(self, x, y): 

491 return create_or_op(x, y) 

492 

493 def emit_empty(self): 

494 return lambda _: True 

495 

496 

497class QueryParserPython: 

498 def __init__(self, model, user_query, ignore_fields: list[str] = []): 

499 self.model = model 

500 self.user_query = user_query 

501 self.error = None 

502 self.matching_fn: Callable[[Any], bool] 

503 

504 try: 

505 parser = ParserPython(query) 

506 parse_tree = parser.parse(user_query) 

507 query_visitor = VisitorLocal(model, ignore_fields=ignore_fields) 

508 self.matching_fn = visit_parse_tree(parse_tree, query_visitor) 

509 except (ValueError, NoMatch) as err: 

510 self.error = str(err) 

511 

512 @property 

513 def is_valid(self): 

514 return self.error is None 

515 

516 

517class QueryParser: 

518 def __init__(self, model, user_query, ignore_fields: list[str] = []): 

519 self.model = model 

520 self.user_query = user_query 

521 

522 self.error = None 

523 self.q_objects = Q() 

524 self.orderby = None 

525 self.limit = None 

526 

527 try: 

528 parser = ParserPython(query) 

529 parse_tree = parser.parse(self.user_query) 

530 query_visitor = VisitorQ(self.model, ignore_fields=ignore_fields) 

531 

532 self.q_objects = visit_parse_tree(parse_tree, query_visitor) 

533 self.orderby = query_visitor.orderby 

534 self.limit = query_visitor.limit 

535 except ValueError as e: 

536 self.error = str(e) 

537 except NoMatch as e: 

538 self.error = str(e) 

539 

540 @property 

541 def query_key(self): 

542 return Shortener.get_or_create(self.user_query).shorthand 

543 

544 @property 

545 def is_valid(self): 

546 return self.error is None 

547 

548 @property 

549 def is_empty(self): 

550 return not self.is_valid or len(self.user_query) == 0 

551 

552 @cached_property 

553 def objects(self): 

554 if self.is_valid: 

555 query = self.model.objects.filter(self.q_objects).distinct() 

556 query = query.order_by(self.orderby) if self.orderby is not None else query 

557 return query[:self.limit] if self.limit is not None else query 

558 else: 

559 return self.model.objects.none() 

560 

561 

562class LegacyParser: 

563 userfilters_allowed_lookups = {'exact': '=', 'in': 'IS IN', 'regex': '~=', 'contains': 'CONTAINS', 

564 'icontains': 'ICONTAINS', 'gt': '>', 'gte': '>=', 'lt': '<', 'lte': '<='} 

565 userfilters_allowed_types = ['str', 'int', 'bool', 'datetime', 'duration'] 

566 

567 def __init__(self, model, **user_filters): 

568 # Filters should all be of the following format: 

569 # (only|exclude)__(object)__(in|regex|gt|lt) = str or format(value) 

570 lookups = "|".join(self.userfilters_allowed_lookups.keys()) 

571 format_re = re.compile((r'(?P<action>(only|exclude))__(?P<object>\w+)__' 

572 '(?P<lookup>({lookups}))'.format(lookups=lookups))) 

573 

574 # Iterate through the user filters, match them to our format regex, 

575 # then construct the right ORM call 

576 only = [] 

577 exclude = [] 

578 for key, item in user_filters.items(): 

579 match = format_re.match(key) 

580 if match: 

581 fields = match.groupdict() 

582 

583 db_object = model.filter_objects_to_db.get(fields['object']) 

584 if db_object is None: 

585 continue 

586 

587 # aggregate all regular expressions into one request 

588 if fields['lookup'] == 'regex' and isinstance(item, list) and len(item) > 1: 

589 item = r'('+'|'.join(item)+')' 

590 

591 # Try converting the item to the right unit 

592 item = self._convert_user_values(item) 

593 

594 bfilter = "{} {} {}".format(fields['object'], 

595 self.userfilters_allowed_lookups.get(fields['lookup']), 

596 item) 

597 

598 if fields['action'] == 'only': 

599 only.append(bfilter) 

600 else: 

601 exclude.append(bfilter) 

602 

603 self.query = " AND ".join(only) 

604 if len(exclude) > 0: 

605 if len(only) > 0: 

606 self.query += ' AND ' 

607 self.query += "NOT ({})".format(" AND ".join(exclude)) 

608 

609 @classmethod 

610 def _convert_user_value(cls, value): 

611 # Will automatically be cached by python 

612 types = "|".join(cls.userfilters_allowed_types) 

613 item_re = re.compile(r'(?P<type>({types}))\((?P<value>.*)\)'.format(types=types)) 

614 

615 match = item_re.match(value) 

616 if match: 

617 fields = match.groupdict() 

618 

619 try: 

620 if fields['type'] == 'str': 

621 return "'{}'".format(fields['value']) 

622 elif fields['type'] == 'bool': 

623 return "TRUE" if FilterObjectBool.parse_value(fields['value']) else "FALSE" 

624 elif fields['type'] == 'int': 

625 return fields['value'] 

626 elif fields['type'] == 'datetime' or fields['type'] == 'duration': 

627 return value 

628 except Exception: # pragma: no cover 

629 traceback.print_exc() # pragma: no cover 

630 

631 # Default to the variable being a string 

632 return "'" + value + "'" 

633 

634 @classmethod 

635 def _convert_user_values(cls, items): 

636 # detect whether we have a singular value or a list 

637 if isinstance(items, list): 

638 if len(items) > 1: 

639 new = [] 

640 for item in items: 

641 new.append(cls._convert_user_value(item)) 

642 return "[" + ", ".join(new) + "]" 

643 else: 

644 return cls._convert_user_value(items[0]) 

645 else: 

646 return cls._convert_user_value(items) 

647 

648 

649class UserFiltrableMixin: 

650 @classmethod 

651 def _get_value_from_params(cls, user_filters, key): 

652 val = user_filters.get(key) 

653 if isinstance(val, list) and len(val) == 1: 

654 val = val[0] 

655 return val 

656 

657 @classmethod 

658 def from_user_filters(cls, prefix=None, **user_filters): 

659 query_param_name = f'{prefix}_query' if prefix is not None else 'query' 

660 query = cls._get_value_from_params(user_filters, query_param_name) 

661 if query is None: 

662 query_key = cls._get_value_from_params(user_filters, f'{query_param_name}_key') 

663 short = Shortener.objects.filter(shorthand=query_key).first() 

664 if short is not None: 

665 query = short.full 

666 

667 if query is not None: 

668 return QueryParser(cls, query) 

669 else: 

670 query = LegacyParser(cls, **user_filters).query 

671 return QueryParser(cls, query) 

672 

673 

674class FilterObject: 

675 def __init__(self, db_path, description=None): 

676 self._db_path = db_path 

677 self._description = description 

678 

679 @property 

680 def db_path(self): 

681 return self._db_path 

682 

683 @property 

684 def description(self): 

685 if self._description is None: 

686 return "<no description yet>" 

687 else: 

688 return self._description 

689 

690 

691class FilterObjectJSON(FilterObject): 

692 data_type = "anything" 

693 documentation = "Expected format: <JSON field>.<key>" 

694 test_value = "test" 

695 

696 def __init__(self, db_path, description=None, key=None): 

697 self.key = key 

698 super().__init__(db_path, description) 

699 

700 @property 

701 def db_path(self): 

702 if self.key is None: 

703 raise ValueError("Dict field require a key to be accessed") # pragma: no cover 

704 return "{}__{}".format(self._db_path, self.key) 

705 

706 

707class FilterObjectStr(FilterObject): 

708 data_type = "string" 

709 documentation = "Expected format: anything. Use quotes for the new query language (\"\" or ''). " \ 

710 "Escape quotes by placing '\\' before quote character." 

711 test_value = "str_test" 

712 

713 def __init__(self, db_path, description=None): 

714 super().__init__(db_path, description) 

715 

716 @classmethod 

717 def parse_value(cls, value): 

718 return str(value) 

719 

720 

721class FilterObjectDateTime(FilterObject): 

722 data_type = "datetime" 

723 documentation = "Expected format: datetime(YYYY-MM-DD HH:MM[:ss[.uuuuuu]][TZ])" 

724 test_value = "2019-01-01" 

725 

726 def __init__(self, db_path, description=None): 

727 super().__init__(db_path, description) 

728 

729 @classmethod 

730 def parse_value(cls, value): 

731 return timezone.make_aware(datetimeparser.parse(value), pytz.utc) 

732 

733 

734class FilterObjectDuration(FilterObject): 

735 data_type = "duration" 

736 documentation = 'Expected format: "duration(DD HH:MM:SS.uuuuuu)", or "duration(P4DT1H15M20S)" (ISO 8601), ' \ 

737 'or "duration(3 days 04:05:06)" (PostgreSQL).' 

738 test_value = "123.456 seconds" 

739 

740 def __init__(self, db_path, description=None): 

741 super().__init__(db_path, description) 

742 

743 @classmethod 

744 def parse_value(cls, value): 

745 duration = parse_duration(value) 

746 if duration is None: 

747 raise ValueError("The value '{}' does not represent a duration. {}".format(value, cls.documentation)) 

748 return duration 

749 

750 

751class FilterObjectBool(FilterObject): 

752 data_type = "boolean" 

753 documentation = "Supported values: bool(false)/bool(0) or bool(true)/bool(1). " \ 

754 "Use TRUE or FALSE for the new query language." 

755 test_value = "True" 

756 

757 def __init__(self, db_path, description=None): 

758 super().__init__(db_path, description) 

759 

760 @classmethod 

761 def parse_value(cls, value): 

762 return str(value).lower() in ["1", "true"] 

763 

764 

765class FilterObjectInteger(FilterObject): 

766 data_type = "integer" 

767 documentation = "Supported values: int(12345). Use 12345 for the new query language." 

768 test_value = 12345 

769 

770 def __init__(self, db_path, description=None): 

771 super().__init__(db_path, description) 

772 

773 @classmethod 

774 def parse_value(cls, value): 

775 return int(float(value)) 

776 

777 

778class FilterObjectModel(FilterObject): 

779 data_type = "subquery" 

780 documentation = "Expected format: Any query compatible with the model selected" 

781 

782 def __init__(self, model, db_path, description=None): 

783 self.model = model 

784 super().__init__(db_path, description) 

785 

786 def parse_value(self, value): 

787 result = QueryParser(self.model, value) 

788 if not result.is_valid: 

789 raise ValueError(result.error) 

790 return result.objects 

791 

792 

793class QueryCreator: 

794 def __init__(self, request, Model, prefix=None, default_query_parameters={}): 

795 self.request = request 

796 self.Model = Model 

797 self.prefix = prefix 

798 self.default_query_parameters = default_query_parameters 

799 

800 def __create_query_from_filters(self, **requested_filters): 

801 query = self.Model.from_user_filters(self.prefix, **requested_filters) 

802 if len(query.user_query) > 0: 

803 if not query.is_valid and self.request: 

804 messages.error(self.request, "Filtering error: " + query.error) 

805 return query 

806 return None 

807 

808 def __build_query_string(self): 

809 op_mappings = { 

810 'string': 'MATCHES', 

811 'datetime': 'MATCHES', 

812 'integer': 'MATCHES', 

813 } 

814 query_str = "" 

815 for obj in self.Model.filter_objects_to_db: 

816 param_value = self.request.GET.get(f'{self.prefix}_{obj}' if self.prefix else obj) 

817 if param_value: 

818 data_type = self.Model.filter_objects_to_db[obj].data_type 

819 if len(query_str) > 0: 

820 query_str += " AND " 

821 query_str += f"{obj} {op_mappings[data_type]} '{param_value}'" 

822 return query_str 

823 

824 def string_to_query(self, query_string): 

825 query_param_name = f'{self.prefix}_query' if self.prefix else 'query' 

826 query_dict = QueryDict('', mutable=True) 

827 query_dict.update({f'{query_param_name}': query_string}) 

828 query = self.__create_query_from_filters(**query_dict) 

829 if query: 

830 return query 

831 return self.Model.from_user_filters(**self.default_query_parameters) 

832 

833 def request_to_query(self): 

834 for params in [self.request.POST, self.request.GET]: 

835 # convert the user filters to a normal dictionary to prevent issues when 

836 # inserting new values 

837 requested_filters = params.copy() 

838 query = self.__create_query_from_filters(**requested_filters) 

839 if query: 

840 return query 

841 

842 return self.Model.from_user_filters(**self.default_query_parameters) 

843 

844 def multiple_request_params_to_query(self): 

845 query_str = self.__build_query_string() 

846 return self.string_to_query(query_str)