Source code for graphene_elastic.filter_backends.filtering.common

from copy import deepcopy
import graphene
from stringcase import pascalcase as to_pascal_case

from ..base import BaseBackend
from ...constants import (
    ALL_LOOKUP_FILTERS_AND_QUERIES,
    DYNAMIC_CLASS_NAME_PREFIX,
    LOOKUP_FILTER_EXISTS,
    LOOKUP_FILTER_PREFIX,
    LOOKUP_FILTER_RANGE,
    LOOKUP_FILTER_TERM,
    LOOKUP_FILTER_TERMS,
    LOOKUP_FILTER_WILDCARD,
    LOOKUP_QUERY_CONTAINS,
    LOOKUP_QUERY_ENDSWITH,
    LOOKUP_QUERY_EXCLUDE,
    LOOKUP_QUERY_GT,
    LOOKUP_QUERY_GTE,
    LOOKUP_QUERY_IN,
    LOOKUP_QUERY_ISNULL,
    LOOKUP_QUERY_LT,
    LOOKUP_QUERY_LTE,
    LOOKUP_QUERY_STARTSWITH,
    VALUE,
)
from .mixins import FilteringFilterMixin
from .queries import LOOKUP_FILTER_MAPPING

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2019-2022 Artur Barseghyan"
__license__ = "GPL-2.0-only OR LGPL-2.1-or-later"
__all__ = ("FilteringFilterBackend",)


[docs]class FilteringFilterBackend(BaseBackend, FilteringFilterMixin): """Filtering filter backend.""" prefix = "filter" has_query_fields = True @property def filter_fields(self): """Filtering filter fields.""" filter_fields = getattr( self.connection_field.type._meta.node._meta, 'filter_backend_options', {} ).get('filter_fields', {}) return deepcopy(filter_fields) @property def filter_args_mapping(self): return {field: field for field, value in self.filter_fields.items()}
[docs] def field_belongs_to(self, field_name): """Check if given filter field belongs to the backend. :param field_name: :return: """ return field_name in self.filter_fields
[docs] def get_backend_query_fields(self, items, is_filterable_func, get_type_func): """Fail proof override. :param items: :param is_filterable_func: :param get_type_func: :return: """ if not self.filter_fields: return {} return super(FilteringFilterBackend, self).get_backend_query_fields( items=items, is_filterable_func=is_filterable_func, get_type_func=get_type_func )
[docs] def get_field_type(self, field_name, field_value, base_field_type): """Get field type. :return: """ if not self.filter_fields: return None field_options = self.get_field_options(field_name) if isinstance(field_options, dict) and "type" in field_options: if field_options["type"] in ("nested", "object"): return self.get_nested_field_type( field_name, field_value, base_field_type, field_options ) if isinstance(field_options, dict) and "lookups" in field_options: lookups = field_options.get("lookups", []) else: lookups = list(ALL_LOOKUP_FILTERS_AND_QUERIES) params = {VALUE: base_field_type} for lookup in lookups: query_cls = LOOKUP_FILTER_MAPPING.get(lookup) if not query_cls: continue params.update({lookup: query_cls()}) return graphene.Argument( type( "{}{}{}{}".format( DYNAMIC_CLASS_NAME_PREFIX, to_pascal_case(self.prefix), self.connection_field.type.__name__, to_pascal_case(field_name) ), (graphene.InputObjectType,), params, ) )
[docs] def get_nested_field_type( self, field_name, field_value, base_field_type, field_options ): params = {} for sub_field_name in field_options.get("properties", []): _field_name = "{}.{}".format(field_name, sub_field_name) _field_type = self.get_field_type( _field_name, field_value, base_field_type) _field_type.__name__ = "{}{}{}{}".format( DYNAMIC_CLASS_NAME_PREFIX, to_pascal_case(self.prefix), self.connection_field.type.__name__, to_pascal_case(_field_name.replace(".", "_")) ) params.update({sub_field_name: _field_type}) return graphene.Argument( type( "{}{}{}{}".format( DYNAMIC_CLASS_NAME_PREFIX, to_pascal_case(self.prefix), self.connection_field.type.__name__, to_pascal_case(field_name.replace(".", "_")) ), (graphene.InputObjectType,), params, ) )
# def get_field_options(self, field_name): # """Get field options.""" # if field_name in self.filter_fields: # return self.filter_fields[field_name] # return {}
[docs] def prepare_filter_fields(self): """Prepare filter fields Assume that we have a document like this. ```python class Comment(InnerDoc): author = Text(fields={'raw': Keyword()}) content = Text(analyzer='snowball') created_at = Date() def age(self): return datetime.now() - self.created_at class Post(Document): title = Text() title_suggest = Completion() created_at = Date() published = Boolean() category = Text( analyzer=html_strip, fields={'raw': Keyword()} ) comments = Nested(Comment) ``` Possible structures: filter_fields = { 'title': { 'type': 'normal|object|nested', 'field': 'title' # custom field name 'lookups': [ ... # custom lookup list ], 'default_lookup': ... # custom default lookup }, 'created_at': { 'type': 'normal', 'field': 'created_at', 'lookups': [ LOOKUP_FILTER_RANGE # only range ] }, 'published': LOOKUP_FILTER_TERM # treated as default lookup ... } We shall finally have: filter_fields = { 'title': { 'type': 'normal', 'field': 'title.raw', 'lookups': [ ... ], 'default_lookup': ... }, ... # any else fields indexed of this document 'comments': { 'type': 'nested', 'properties': { 'author': { 'type': 'normal', 'field': 'comments.author', ... }, 'content': { 'type': 'normal', 'field': 'comments.content', ... } } } } """ def _recursive_correct_filter_fields( filter_fields, root_field=None, is_nested=False ): # TODO: Generate complete filter_fields data = {} for field_name, field_options in filter_fields.items(): data[field_name] = {} default_lookup, lookups = None, None if isinstance(field_options, str): field = field_options field_type = "normal" elif isinstance(field_options, dict): field = field_options.get("field", field_name) default_lookup = field_options.get("default_lookup") lookups = field_options.get("lookups") field_type = field_options.get("type", "normal") else: raise TypeError( "Field option must be type of str or dict.") if lookups is None: lookups = ALL_LOOKUP_FILTERS_AND_QUERIES if default_lookup is None: default_lookup = lookups[0] field = "{}.{}".format( root_field, field) if root_field else field data[field_name]["field"] = field data[field_name]["type"] = field_type if field_type == "normal": data[field_name]["lookups"] = lookups data[field_name]["default_lookup"] = default_lookup else: data[field_name]["properties"] = \ _recursive_correct_filter_fields( field_options["properties"], root_field=field, is_nested=field_type == "nested" ) if is_nested: data[field_name]["path"] = root_field return data return _recursive_correct_filter_fields(self.filter_fields)
[docs] def prepare_query_params(self): """Prepare query params. :return: """ filter_args = dict(self.args).get(self.prefix) if not filter_args: return {} query_params = {} for arg, filters in filter_args.items(): field = self.filter_args_mapping.get(arg, None) if field is None: continue query_params[field] = filters return query_params
[docs] def get_field_lookup_param(self, field_name): """Get field lookup param. :param field_name: :return: """ field_options = dict(self.args) \ .get(self.prefix, {}) \ .get(field_name, {}) return field_options.get("lookup", None)
[docs] def get_field_options(self, field_name, filter_fields=None): """默认从Node中配置中读取,如果没有则从document中读取 可能的字段名: 1. author 2. comments.author 3. comments.author.name """ if filter_fields is None: filter_fields = self.filter_fields search_path = field_name.split(".") data = deepcopy(filter_fields) for p in search_path: if "properties" in data: data = data["properties"] data = data.get(p, {}) return data or None
[docs] def get_filter_query_params(self): """Get query params to be filtered on. We can either specify it like this: query_params = { 'category': { 'value': 'Elastic', } } Or using specific lookup: query_params = { 'category': { 'term': 'Elastic', 'range': { 'lower': Decimal('3.0') } } } Note, that `value` would only work on simple types (string, integer, decimal). For complex types you would have to use complex param anyway. Therefore, it should be forbidden to set `default_lookup` to a complex field type. Sample values: query_params = { 'category': { 'value': 'Elastic', }, 'comments': { 'author': { 'name': { 'value': 'Elastic' } } } } filter_fields = { 'category': { 'field': 'category.raw', 'type': 'normal', 'default_lookup': 'term', 'lookups': ( 'term', 'terms', 'range', 'exists', 'prefix', 'wildcard', 'contains', 'in', 'gt', 'gte', 'lt', 'lte', 'starts_with', 'ends_with', 'is_null', 'exclude' ) }, 'comments': { 'field': 'comments', 'type': 'nested', 'properties': { 'author': { 'type': 'object', 'path': 'comments.author', 'properties': { 'name': { 'type': 'normal', 'field': 'author.name', 'default_lookups': 'term', 'lookups': ( ... ) } } } } } } field_name = 'category' { 'inventory_type': {'value': '1'}, 'spu': { 'supplier_entityms_entity_id': { 'contains': 'Elastic' }, 'brand': { 'code': { 'term': 'Elastic' } } } } """ query_params = self.prepare_query_params() # Shall be fixed filter_query_params = [] filter_fields = self.prepare_filter_fields() # Correct def _recursive_get_lookup_param_options( query_dict: dict, predefined_filter_fields: dict, ret=None ): """In-depth traversal of the tree dict to generate a query list.""" filter_fields = deepcopy(predefined_filter_fields) for field_name, lookup_params in query_dict.items(): if field_name not in filter_fields: continue field_options = filter_fields[field_name] field_type = field_options["type"] if field_type in ("nested", "object"): _recursive_get_lookup_param_options( query_dict=lookup_params, predefined_filter_fields=filter_fields[ field_name ]["properties"], ret=ret ) else: valid_lookup = field_options.get("lookups", ()) default_lookup = field_options.get("default_lookup", None) for lookup_param, lookup_options in lookup_params.items(): lookup = None if lookup_param is VALUE: lookup = default_lookup elif lookup_param in valid_lookup: lookup = lookup_param if lookup_options is not None: ret.append({ "lookup": lookup, "values": lookup_options, "path": field_options.get("path"), "field": field_options.get( "field", field_name ), "type": self.doc_type.mapping.properties.name, }) _recursive_get_lookup_param_options( query_dict=query_params, predefined_filter_fields=filter_fields, ret=filter_query_params ) return filter_query_params
[docs] def filter(self, queryset): """Filter.""" filter_query_params = self.get_filter_query_params() for filter_query in filter_query_params: # For all other cases, when we don't have multiple values, # we follow the normal flow. if filter_query["lookup"] == LOOKUP_FILTER_TERMS: queryset = self.apply_filter_terms( queryset, filter_query, filter_query['values'] ) # `prefix` filter lookup elif filter_query["lookup"] in ( LOOKUP_FILTER_PREFIX, LOOKUP_QUERY_STARTSWITH, ): queryset = self.apply_filter_prefix( queryset, filter_query, filter_query['values'] ) # `range` filter lookup elif filter_query["lookup"] == LOOKUP_FILTER_RANGE: queryset = self.apply_filter_range( queryset, filter_query, filter_query['values'] ) # `exists` filter lookup elif filter_query["lookup"] == LOOKUP_FILTER_EXISTS: queryset = self.apply_query_exists( queryset, filter_query, filter_query['values'] ) # `wildcard` filter lookup elif filter_query["lookup"] == LOOKUP_FILTER_WILDCARD: queryset = self.apply_query_wildcard( queryset, filter_query, filter_query['values'] ) # `contains` filter lookup elif filter_query["lookup"] == LOOKUP_QUERY_CONTAINS: queryset = self.apply_query_contains( queryset, filter_query, filter_query['values'] ) # `in` functional query lookup elif filter_query["lookup"] == LOOKUP_QUERY_IN: queryset = self.apply_query_in( queryset, filter_query, filter_query['values'] ) # `gt` functional query lookup elif filter_query["lookup"] == LOOKUP_QUERY_GT: queryset = self.apply_query_gt( queryset, filter_query, filter_query['values'] ) # `gte` functional query lookup elif filter_query["lookup"] == LOOKUP_QUERY_GTE: queryset = self.apply_query_gte( queryset, filter_query, filter_query['values'] ) # `lt` functional query lookup elif filter_query["lookup"] == LOOKUP_QUERY_LT: queryset = self.apply_query_lt( queryset, filter_query, filter_query['values'] ) # `lte` functional query lookup elif filter_query["lookup"] == LOOKUP_QUERY_LTE: queryset = self.apply_query_lte( queryset, filter_query, filter_query['values'] ) # `endswith` filter lookup elif filter_query["lookup"] == LOOKUP_QUERY_ENDSWITH: queryset = self.apply_query_endswith( queryset, filter_query, filter_query['values'] ) # `isnull` functional query lookup elif filter_query["lookup"] == LOOKUP_QUERY_ISNULL: queryset = self.apply_query_isnull( queryset, filter_query, filter_query['values'] ) # `exclude` functional query lookup elif filter_query["lookup"] == LOOKUP_QUERY_EXCLUDE: queryset = self.apply_query_exclude( queryset, filter_query, filter_query['values'] ) # `term` filter lookup. This is default if no `default_lookup` # filter_query has been given or explicit lookup provided. else: queryset = super( FilteringFilterBackend, self ).apply_filter_term( queryset, filter_query, filter_query['values'] ) return queryset