import operator
import graphene
from elasticsearch_dsl.query import Q
import six
from ..base import BaseBackend
from ...constants import (
ALL_LOOKUP_FILTERS_AND_QUERIES,
DYNAMIC_CLASS_NAME_PREFIX,
FALSE_VALUES,
LOOKUP_FILTER_EXISTS,
LOOKUP_FILTER_PREFIX,
LOOKUP_FILTER_RANGE,
LOOKUP_FILTER_TERM,
LOOKUP_FILTER_TERMS,
LOOKUP_FILTER_WILDCARD,
LOOKUP_QUERY_CONTAINS,
LOOKUP_QUERY_ENDSWITH,
LOOKUP_QUERY_EXCLUDE,
LOOKUP_QUERY_GT,
LOOKUP_QUERY_GTE,
LOOKUP_QUERY_IN,
LOOKUP_QUERY_ISNULL,
LOOKUP_QUERY_LT,
LOOKUP_QUERY_LTE,
LOOKUP_QUERY_STARTSWITH,
TRUE_VALUES,
VALUE,
LOWER,
UPPER,
GTE,
LTE,
BOOST,
)
from .queries import LOOKUP_FILTER_MAPPING
__title__ = "graphene_elastic.filter_backends.filtering.common"
__author__ = "Artur Barseghyan <artur.barseghyan@gmail.com>"
__copyright__ = "2019 Artur Barseghyan"
__license__ = "GPL-2.0-only OR LGPL-2.1-or-later"
__all__ = ("FilteringFilterBackend",)
[docs]class FilteringFilterBackend(BaseBackend):
"""Filtering filter backend."""
prefix = "filter"
has_fields = True
[docs] def field_belongs_to(self, field_name):
return field_name in self.connection_field.filter_fields
[docs] def get_field_type(self, field_name, field_value, base_field_type):
"""Get field type.
:return:
"""
field_options = self.get_field_options(field_name)
if isinstance(field_options, dict) and "lookups" in field_options:
lookups = field_options.get("lookups", [])
else:
lookups = list(ALL_LOOKUP_FILTERS_AND_QUERIES)
params = {VALUE: base_field_type}
for lookup in lookups:
query_cls = LOOKUP_FILTER_MAPPING.get(lookup)
if not query_cls:
continue
params.update({lookup: query_cls()})
return graphene.Argument(
type(
"{}{}{}{}".format(
DYNAMIC_CLASS_NAME_PREFIX,
self.prefix,
self.connection_field.type.__name__,
field_name.title()
),
(graphene.InputObjectType,),
params,
)
)
def get_field_options(self, field_name):
"""Get field options."""
if field_name in self.connection_field.filter_fields:
return self.connection_field.filter_fields[field_name]
return {}
[docs] @classmethod
def get_range_params(cls, value, options):
"""Get params for `range` query.
Syntax:
TODO
Example:
{
allPostDocuments(filter:{numViews:{range:{
lower:"100",
upper:"200",
boost:"2.0"
}}}) {
edges {
node {
category
title
content
numViews
}
}
}
}
:param value:
:param options:
:type value: str
:type options: dict
:return: Params to be used in `range` query.
:rtype: dict
"""
if LOWER not in value:
return {}
params = {GTE: float(value.get(LOWER, None))}
if UPPER in value:
params.update({LTE: float(value.get(UPPER, None))})
if BOOST in value:
params.update({BOOST: float(value.get(BOOST, None))})
return params
[docs] @classmethod
def get_gte_lte_params(cls, value, lookup, options):
"""Get params for `gte`, `gt`, `lte` and `lt` query.
Syntax:
TODO
Example:
{
allPostDocuments(filter:{numViews:{gt:"100", lt:"200"}}) {
edges {
node {
category
title
content
numViews
}
}
}
}
:param value:
:param lookup:
:param options:
:type value: str
:type lookup: str
:type options: dict
:return: Params to be used in `range` query.
:rtype: dict
"""
if not value:
return {}
params = {lookup: value}
if BOOST in options:
params.update({BOOST: float(options.get(BOOST, None))})
return params
[docs] @classmethod
def apply_filter_term(cls, queryset, options, value):
"""Apply `term` filter.
Syntax:
TODO
Example:
query {
allPostDocuments(filter:{category:{term:"Python"}}) {
edges {
node {
category
title
content
numViews
comments
}
}
}
}
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
return cls.apply_filter(
queryset=queryset,
options=options,
args=["term"],
kwargs={options["field"]: value},
)
[docs] @classmethod
def apply_filter_terms(cls, queryset, options, value):
"""Apply `terms` filter.
Syntax:
TODO
Note, that number of values is not limited.
Example:
query {
allPostDocuments(filter:{category:{
terms:["Python", "Django"]
}}) {
edges {
node {
category
title
content
numViews
comments
}
}
}
}
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: mixed: either str or iterable (list, tuple).
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
# If value is a list or a tuple, we use it as is.
if isinstance(value, (list, tuple)):
__values = value
# Otherwise, we consider it to be a string and split it further.
else:
__values = cls.split_lookup_complex_value(value)
return cls.apply_filter(
queryset=queryset,
options=options,
args=["terms"],
kwargs={options["field"]: __values},
)
[docs] @classmethod
def apply_filter_range(cls, queryset, options, value):
"""Apply `range` filter.
Syntax:
TODO
Example:
{
allPostDocuments(filter:{numViews:{range:{
lower:"100",
upper:"200"
}}}) {
edges {
node {
category
title
content
numViews
}
}
}
}
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
return cls.apply_filter(
queryset=queryset,
options=options,
args=["range"],
kwargs={
options["field"]: cls.get_range_params(
value,
options.get('options', {})
)
},
)
[docs] @classmethod
def apply_query_exists(cls, queryset, options, value):
"""Apply `exists` filter.
Syntax:
TODO
Example:
{
allPostDocuments(filter:{category:{exists:true}}) {
edges {
node {
category
title
content
numViews
}
}
}
}
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
_value_lower = value # TODO: clean up?
if _value_lower in TRUE_VALUES:
return cls.apply_query(
queryset=queryset,
options=options,
args=[Q("exists", field=options["field"])],
)
elif _value_lower in FALSE_VALUES:
return cls.apply_query(
queryset=queryset,
options=options,
args=[~Q("exists", field=options["field"])],
)
return queryset
[docs] @classmethod
def apply_filter_prefix(cls, queryset, options, value):
"""Apply `prefix` filter.
Syntax:
TODO
Example:
query {
allPostDocuments(filter:{category:{prefix:"Pyth"}}) {
edges {
node {
category
title
content
numViews
comments
}
}
}
}
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
return cls.apply_filter(
queryset=queryset,
options=options,
args=["prefix"],
kwargs={options["field"]: value},
)
[docs] @classmethod
def apply_query_wildcard(cls, queryset, options, value):
"""Apply `wildcard` filter.
Syntax:
TODO
Example:
query {
allPostDocuments(filter:{category:{wildcard:"*ytho*"}}) {
edges {
node {
category
title
content
numViews
comments
}
}
}
}
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
return cls.apply_query(
queryset=queryset,
options=options,
args=[Q("wildcard", **{options["field"]: value})],
)
[docs] @classmethod
def apply_query_contains(cls, queryset, options, value):
"""Apply `contains` filter.
Syntax:
TODO
Example:
query {
allPostDocuments(filter:{category:{contains:"tho"}}) {
edges {
node {
category
title
content
numViews
}
}
}
}
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
return cls.apply_query(
queryset=queryset,
options=options,
args=[Q("wildcard", **{options["field"]: "*{}*".format(value)})],
)
[docs] @classmethod
def apply_query_endswith(cls, queryset, options, value):
"""Apply `endswith` filter.
Syntax:
TODO
Example:
query {
allPostDocuments(filter:{category:{endsWith:"thon"}}) {
edges {
node {
category
title
content
numViews
}
}
}
}
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
return cls.apply_query(
queryset=queryset,
options=options,
args=[Q("wildcard", **{options["field"]: "*{}".format(value)})],
)
[docs] @classmethod
def apply_query_in(cls, queryset, options, value):
"""Apply `in` functional query.
Syntax:
TODO
Note, that number of values is not limited.
Example:
query {
allPostDocuments(filter:{tags:{in:["photography", "models"]}}) {
edges {
node {
category
title
content
numViews
tags
}
}
}
}
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
# If value is a list or a tuple, we use it as is.
if isinstance(value, (list, tuple)):
__values = value
# Otherwise, we consider it to be a string and split it further.
else:
__values = cls.split_lookup_complex_value(value)
__queries = []
for __value in __values:
__queries.append(Q("term", **{options["field"]: __value}))
if __queries:
queryset = cls.apply_query(
queryset=queryset,
options=options,
args=[six.moves.reduce(operator.or_, __queries)],
)
return queryset
[docs] @classmethod
def apply_query_gt(cls, queryset, options, value):
"""Apply `gt` functional query.
Syntax:
TODO
Example:
query {
allPostDocuments(filter:{numViews:{gt:"100"}}) {
edges {
node {
category
title
content
numViews
}
}
}
}
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
return cls.apply_filter(
queryset=queryset,
options=options,
args=["range"],
kwargs={
options["field"]: cls.get_gte_lte_params(
value,
"gt",
options.get('options', {})
)
},
)
[docs] @classmethod
def apply_query_gte(cls, queryset, options, value):
"""Apply `gte` functional query.
Syntax:
TODO
Example:
query {
allPostDocuments(filter:{numViews:{gte:"100"}}) {
edges {
node {
category
title
content
numViews
}
}
}
}
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
return cls.apply_filter(
queryset=queryset,
options=options,
args=["range"],
kwargs={
options["field"]: cls.get_gte_lte_params(
value,
"gte",
options.get('options', {})
)
},
)
[docs] @classmethod
def apply_query_lt(cls, queryset, options, value):
"""Apply `lt` functional query.
Syntax:
TODO
Example:
query {
allPostDocuments(filter:{numViews:{lt:"200"}}) {
edges {
node {
category
title
content
numViews
}
}
}
}
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
return cls.apply_filter(
queryset=queryset,
options=options,
args=["range"],
kwargs={
options["field"]: cls.get_gte_lte_params(
value,
"lt",
options.get('options', {})
)
},
)
[docs] @classmethod
def apply_query_lte(cls, queryset, options, value):
"""Apply `lte` functional query.
Syntax:
TODO
Example:
query {
allPostDocuments(filter:{numViews:{lte:"200"}}) {
edges {
node {
category
title
content
numViews
}
}
}
}
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
return cls.apply_filter(
queryset=queryset,
options=options,
args=["range"],
kwargs={
options["field"]: cls.get_gte_lte_params(
value,
"lte",
options.get('options', {})
)
},
)
[docs] @classmethod
def apply_query_isnull(cls, queryset, options, value):
"""Apply `isnull` functional query.
Syntax:
TODO
Example:
query {
allPostDocuments(filter:{category:{isNull:true}}) {
edges {
node {
category
title
content
numViews
comments
}
}
}
}
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
_value_lower = value # TODO: clean up?
if _value_lower in TRUE_VALUES:
return cls.apply_query(
queryset=queryset,
options=options,
args=[~Q("exists", field=options["field"])],
)
elif _value_lower in FALSE_VALUES:
return cls.apply_query(
queryset=queryset,
options=options,
args=[Q("exists", field=options["field"])],
)
return queryset
[docs] @classmethod
def apply_query_exclude(cls, queryset, options, value):
"""Apply `exclude` functional query.
Syntax:
TODO
Note, that number of values is not limited.
Example:
query {
allPostDocuments(filter:{category:{exclude:"Python"}}) {
edges {
node {
category
title
content
numViews
}
}
}
}
Or exclude multiple terms at once:
query {
allPostDocuments(filter:{category:{exclude:["Ruby", "Java"]}}) {
edges {
node {
category
title
content
numViews
}
}
}
}
:param queryset: Original queryset.
:param options: Filter options.
:param value: value to filter on.
:type queryset: elasticsearch_dsl.search.Search
:type options: dict
:type value: str
:return: Modified queryset.
:rtype: elasticsearch_dsl.search.Search
"""
if not isinstance(value, (list, tuple)):
__values = cls.split_lookup_complex_value(value)
else:
__values = value
__queries = []
for __value in __values:
__queries.append(~Q("term", **{options["field"]: __value}))
if __queries:
queryset = cls.apply_query(
queryset=queryset,
options=options,
args=[six.moves.reduce(operator.and_, __queries)],
)
return queryset
[docs] def prepare_filter_fields(self):
"""Prepare filter fields.
Possible structures:
filter_fields = {
'title': {
'field': 'title.raw',
'lookups': [
LOOKUP_FILTER_TERM,
LOOKUP_FILTER_TERMS,
LOOKUP_FILTER_PREFIX,
LOOKUP_FILTER_WILDCARD,
LOOKUP_QUERY_IN,
LOOKUP_QUERY_EXCLUDE,
],
'default_lookup': LOOKUP_FILTER_TERM,
},
'category': 'category.raw',
}
We shall finally have:
filter_fields = {
'title': {
'field': 'title.raw',
'lookups': [
LOOKUP_FILTER_TERM,
LOOKUP_FILTER_TERMS,
LOOKUP_FILTER_PREFIX,
LOOKUP_FILTER_WILDCARD,
LOOKUP_QUERY_IN,
LOOKUP_QUERY_EXCLUDE,
],
'default_lookup': LOOKUP_FILTER_TERM,
},
'category': {
'field': 'category.raw',
'lookups': [
LOOKUP_FILTER_TERM,
LOOKUP_FILTER_TERMS,
LOOKUP_FILTER_PREFIX,
LOOKUP_FILTER_WILDCARD,
LOOKUP_QUERY_IN,
LOOKUP_QUERY_EXCLUDE,
...
# All other lookups
],
'default_lookup': LOOKUP_FILTER_TERM,
}
}
"""
filter_args = dict(self.args).get(self.prefix)
if not filter_args:
return {}
filter_fields = {}
for arg, value in filter_args.items():
field = self.connection_field.filter_args_mapping.get(arg, None)
if field is None:
continue
filter_fields.update({field: {}})
options = self.connection_field.filter_fields.get(field)
# For constructions like 'category': 'category.raw' we shall
# have the following:
# TODO: Make sure to use custom (user specified) lookups
if options is None or isinstance(options, six.string_types):
filter_fields.update(
{
field: {
"field": options or field,
"default_lookup": LOOKUP_FILTER_TERM,
"lookups": tuple(ALL_LOOKUP_FILTERS_AND_QUERIES),
}
}
)
elif "field" not in options:
filter_fields.update({field: options})
filter_fields[field]["field"] = field
else:
filter_fields.update({field: options})
if (
field in filter_fields
and "lookups" not in filter_fields[field]
):
filter_fields[field].update(
{"lookups": tuple(ALL_LOOKUP_FILTERS_AND_QUERIES)}
)
return filter_fields
[docs] def prepare_query_params(self):
"""Prepare query params.
:return:
"""
filter_args = dict(self.args).get(self.prefix)
if not filter_args:
return {}
query_params = {}
for arg, filters in filter_args.items():
field = self.connection_field.filter_args_mapping.get(arg, None)
if field is None:
continue
query_params[field] = filters
return query_params
[docs] def get_field_lookup_param(self, field_name):
"""Get field lookup param.
:param field_name:
:return:
"""
field_options = dict(self.args) \
.get(self.prefix, {}) \
.get(field_name, {})
return field_options.get("lookup", None)
[docs] def get_field_options(self, field_name):
"""Get field option params.
:param field_name:
:return:
"""
return dict(self.args).get(self.prefix, {}).get(field_name, {})
[docs] def get_filter_query_params(self):
"""Get query params to be filtered on.
We can either specify it like this:
query_params = {
'category': {
'value': 'Elastic',
}
}
Or using specific lookup:
query_params = {
'category': {
'term': 'Elastic',
'range': {
'lower': Decimal('3.0')
}
}
}
Note, that `value` would only work on simple types (string, integer,
decimal). For complex types you would have to use complex param
anyway. Therefore, it should be forbidden to set `default_lookup` to a
complex field type.
Sample values:
query_params = {
'category': {
'value': 'Elastic',
}
}
filter_fields = {
'category': {
'field': 'category.raw',
'default_lookup': 'term',
'lookups': (
'term',
'terms',
'range',
'exists',
'prefix',
'wildcard',
'contains',
'in',
'gt',
'gte',
'lt',
'lte',
'starts_with',
'ends_with',
'is_null',
'exclude'
)
}
}
field_name = 'category'
"""
query_params = self.prepare_query_params() # Shall be fixed
filter_query_params = {}
filter_fields = self.prepare_filter_fields() # Correct
for field_name, lookup_params in query_params.items():
if field_name in filter_fields:
filter_query_params[field_name] = []
valid_lookups = filter_fields[field_name]["lookups"]
# If we have default lookup given use it as a default and
# do not require further suffix specification.
default_lookup = None
if "default_lookup" in filter_fields[field_name]:
default_lookup = \
filter_fields[field_name]["default_lookup"]
for lookup_param, lookup_options in lookup_params.items():
lookup = None
if lookup_param == VALUE:
if default_lookup is not None:
lookup = default_lookup
elif lookup_param in valid_lookups:
lookup = lookup_param
if lookup_options is not None:
filter_query_params[field_name].append({
"lookup": lookup,
"values": lookup_options,
"field": filter_fields[field_name].get(
"field", field_name
),
"type": self.doc_type.mapping.properties.name,
})
return filter_query_params
[docs] def filter(self, queryset):
"""Filter."""
filter_query_params = self.get_filter_query_params()
for options in filter_query_params.values():
# For all other cases, when we don't have multiple values,
# we follow the normal flow.
for option in options:
if option["lookup"] == LOOKUP_FILTER_TERMS:
queryset = self.apply_filter_terms(
queryset,
option,
option['values']
)
# `prefix` filter lookup
elif option["lookup"] in (
LOOKUP_FILTER_PREFIX,
LOOKUP_QUERY_STARTSWITH,
):
queryset = self.apply_filter_prefix(
queryset,
option,
option['values']
)
# `range` filter lookup
elif option["lookup"] == LOOKUP_FILTER_RANGE:
queryset = self.apply_filter_range(
queryset,
option,
option['values']
)
# `exists` filter lookup
elif option["lookup"] == LOOKUP_FILTER_EXISTS:
queryset = self.apply_query_exists(
queryset,
option,
option['values']
)
# `wildcard` filter lookup
elif option["lookup"] == LOOKUP_FILTER_WILDCARD:
queryset = self.apply_query_wildcard(
queryset,
option,
option['values']
)
# `contains` filter lookup
elif option["lookup"] == LOOKUP_QUERY_CONTAINS:
queryset = self.apply_query_contains(
queryset,
option,
option['values']
)
# `in` functional query lookup
elif option["lookup"] == LOOKUP_QUERY_IN:
queryset = self.apply_query_in(
queryset,
option,
option['values']
)
# `gt` functional query lookup
elif option["lookup"] == LOOKUP_QUERY_GT:
queryset = self.apply_query_gt(
queryset,
option,
option['values']
)
# `gte` functional query lookup
elif option["lookup"] == LOOKUP_QUERY_GTE:
queryset = self.apply_query_gte(
queryset,
option,
option['values']
)
# `lt` functional query lookup
elif option["lookup"] == LOOKUP_QUERY_LT:
queryset = self.apply_query_lt(
queryset,
option,
option['values']
)
# `lte` functional query lookup
elif option["lookup"] == LOOKUP_QUERY_LTE:
queryset = self.apply_query_lte(
queryset,
option,
option['values']
)
# `endswith` filter lookup
elif option["lookup"] == LOOKUP_QUERY_ENDSWITH:
queryset = self.apply_query_endswith(
queryset,
option,
option['values']
)
# `isnull` functional query lookup
elif option["lookup"] == LOOKUP_QUERY_ISNULL:
queryset = self.apply_query_isnull(
queryset,
option,
option['values']
)
# `exclude` functional query lookup
elif option["lookup"] == LOOKUP_QUERY_EXCLUDE:
queryset = self.apply_query_exclude(
queryset,
option,
option['values']
)
# `term` filter lookup. This is default if no `default_lookup`
# option has been given or explicit lookup provided.
else:
queryset = self.apply_filter_term(
queryset,
option,
option['values']
)
return queryset