Source code for graphene_elastic.filter_backends.search.common

import operator
from copy import copy, deepcopy
from collections import ChainMap

import graphene
from graphene.types.field import source_resolver
import six

from anysearch.search_dsl.query import Q
from stringcase import pascalcase as to_pascal_case

from ..base import BaseBackend
from ...constants import (
    DYNAMIC_CLASS_NAME_PREFIX,
    ALL,
    FIELD,
    VALUE,
    BOOST,
)

__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2019-2022 Artur Barseghyan"
__license__ = "GPL-2.0-only OR LGPL-2.1-or-later"
__all__ = ("SearchFilterBackend",)

nested_input_count = 0


[docs]class SearchFilterBackend(BaseBackend): """Search filter backend.""" prefix = "search" has_query_fields = True @property def search_fields(self): """Search filter fields.""" search_fields = getattr( self.connection_field.type._meta.node._meta, "filter_backend_options", {}, ).get("search_fields", {}) return deepcopy(search_fields) @property def search_nested_fields(self): """Search nested filter fields.""" search_nested_fields = getattr( self.connection_field.type._meta.node._meta, "filter_backend_options", {}, ).get("search_nested_fields", {}) return deepcopy(search_nested_fields) @property def search_args_mapping(self): return {field: field for field, value in self.search_fields.items()} @property def nested_search_args_mapping(self): return { field: field for field, value in self.search_nested_fields.items() }
[docs] def field_belongs_to(self, field_name): """Check if given filter field belongs to the backend. :param field_name: :return: """ return ( field_name in self.search_fields or field_name in self.search_nested_fields )
[docs] def get_backend_default_query_fields_params(self): """Get backend default filter params. :rtype: dict :return: """ return {ALL: graphene.String()}
[docs] def get_search_nested_fields_tree(self, start=None, value=None): """ We got a prepared nested fields , { 'country': { 'path': 'country', 'fields': [ { 'name': {'boost': 2} } ] }, 'city': { 'path': 'country.city', 'fields': [ { 'name': {'boost': 2} } ] } } Then we should turn it to { 'country': { 'name': {}, # {} or None represents no more leaves. 'city': { 'name': {} } } } """ source = self.search_nested_fields path_field_mapping = { option["path"]: field for field, option in source.items() } tree = {} for field, option in source.items(): if start and not option["path"].startswith(start): continue splited_path = option["path"].split(".") inserted = False node = {} for f in option.get("fields", []): if isinstance(f, dict): node.update({list(f.keys())[0]: deepcopy(value)}) elif isinstance(f, six.string_types): node.update({f: deepcopy(value)}) # Find sub path item and insert it inside this node for _path, _field in path_field_mapping.items(): _splited_path = _path.split(".") if ( _path.startswith(option["path"]) and len(_splited_path) - len(splited_path) == 1 and _field in tree ): node.update({_field: tree.pop(_field)}) # Find item which contains this node and put this node inside it. for _path, _field in path_field_mapping.items(): _splited_path = _path.split(".") if ( option["path"].startswith(_path) and len(splited_path) - len(_splited_path) > 0 ): # Note: because we don't sure whether whole path was built, we should traverse the tree # and find the proper place to put this node inside it. t = tree for __splited in splited_path[:-1]: if __splited in t: t = t[__splited] if t != tree: t.update({field: node}) inserted = True break if not inserted: # no other node can contain this node. tree.update({field: node}) return tree
[docs] def get_field_type(self, field_name, field_value, base_field_type): """Get field type. :return: TODO: required """ def get_graphene_argument_type(name, params): global nested_input_count nested_input_count += 1 return graphene.Argument( type( "{}{}{}{}{}".format( DYNAMIC_CLASS_NAME_PREFIX, to_pascal_case(self.prefix), self.connection_field.type.__name__, to_pascal_case(name), str(nested_input_count), ), (graphene.InputObjectType,), params, ) ) def dfs(root, root_field_type): ret = {} for name, node in root.items(): if isinstance(node, dict): params = self.get_backend_default_query_fields_params() params.update( dfs(node, root_field_type._meta.fields.get(name)) ) ret.update({name: get_graphene_argument_type(name, params)}) elif not node: if hasattr(root_field_type, "_meta"): fields = root_field_type._meta.fields else: fields = root_field_type._type._of_type._meta.fields params = { VALUE: fields.get(name), BOOST: graphene.Int(), } ret.update({name: get_graphene_argument_type(name, params)}) return ret if field_name in self.search_fields: params = { VALUE: base_field_type, # Value to search on. Required. BOOST: graphene.Int(), # Boost the given field with. Optional. } return get_graphene_argument_type(field_name, params) elif field_name in self.search_nested_fields: params = self.get_backend_default_query_fields_params() tree = self.get_search_nested_fields_tree().get(field_name) params.update(dfs(tree, base_field_type)) return get_graphene_argument_type(field_name, params)
[docs] def prepare_search_fields(self): """Prepare search fields. Possible structures: search_fields = { 'title': {'boost': 4, 'field': 'title.raw'}, 'content': {'boost': 2}, 'category': None, 'comments': None } We shall finally have: search_fields = { 'title': { 'field': 'title.raw', 'boost': 4 }, 'content': { 'field': 'content', 'boost': 2 }, 'category': { 'field': 'category' } } Sample query would be: { allPostDocuments(search:{query:"Another"}) { pageInfo { startCursor endCursor hasNextPage hasPreviousPage } edges { cursor node { category title content numViews } } } } :return: Filtering options. :rtype: dict """ filter_args = dict(self.args).get(self.prefix) if not filter_args: return {} filter_fields = {} # {'query': '', 'title': {'query': '', 'boost': 1}} for field, _ in self.search_args_mapping.items(): filter_fields.update({field: {}}) options = self.search_fields.get(field) # For constructions like 'category': 'category.raw' we shall # have the following: # if options is None or isinstance(options, six.string_types): filter_fields.update({field: {"field": options or field}}) elif "field" not in options: filter_fields.update({field: options}) filter_fields[field]["field"] = field else: filter_fields.update({field: options}) return filter_fields
[docs] def prepare_search_nested_fields(self): """Prepare search fields. Possible structures: Type1 search_nested_fields = { 'comments': { 'path'; 'comments', 'fields': [ {'author': {'boost': 4}}, {'tag': {'boost': 2}}, ] } } Type2 search_nested_fields = { 'comments: { 'path'; 'comments', 'fields': ['author', 'tag'] } } We shall finally have: search_nested_fields = { 'comments': { 'path': 'comments', 'fields': { {'author': {'boost': 4}}, {'tag': {'boost': 2}} } } } Sample query would be: { allPostDocuments(search:{query:"Another"}) { pageInfo { startCursor endCursor hasNextPage hasPreviousPage } edges { cursor node { category title content numViews } } } } :return: Filtering options. :rtype: dict """ filter_args = dict(self.args).get(self.prefix) if not filter_args: return {} filter_fields = {} search_nested_fields = self.search_nested_fields # {'query': '', 'title': {'query': '', 'boost': 1}} for field, _ in self.nested_search_args_mapping.items(): filter_fields.update({field: {}}) options = deepcopy(search_nested_fields.get(field, {})) if "fields" not in options: options["fields"] = [] fields = [] for _field in options["fields"]: if isinstance(_field, six.string_types): fields.append({_field: {"field": _field}}) elif isinstance(_field, dict): fields.append(_field) options["fields"] = fields filter_fields.update({field: options}) return filter_fields
[docs] def get_all_query_params(self): filter_args = dict(self.args).get(self.prefix) if not filter_args: return {} return filter_args
[docs] def clean_all_query_params(self): """ Get cleaned query params. Remove query lookup. """ def _recursive_remove_query_lookup(params): root = deepcopy(params) for lookup, value in root.items(): if isinstance(value, dict): params[lookup].pop(ALL, None) _recursive_remove_query_lookup(params[lookup]) all_query_params = deepcopy(self.get_all_query_params()) _recursive_remove_query_lookup(all_query_params) return all_query_params
[docs] def filter(self, queryset): """Filter. :param queryset: :return: """ _queries = [] _search = self.construct_search() if _search: _queries.extend(_search) _nested_search = self.construct_nested_search() if _nested_search: _queries.extend(_nested_search) if _queries: queryset = queryset.query("bool", should=_queries) return queryset