Skip to main content

Query Utility for Elasticsearch

Project description

Query Helper for Elasticsearch or OpenSearch

Pypi

This is a helper library for creating elasticsearch or opensearch query proxies using FastAPI.

from fastapi_elasticsearch import ElasticsearchAPIQueryBuilder

# Create a new query_builder for the endpoint.
query_builder = ElasticsearchAPIQueryBuilder()

# Decorate a function as a filter.
# The filter can declare parameters.
@query_builder.filter()
def filter_category(c: Optional[List[str]] = Query([],
                                             description="Category name to filter results.")):
    return {
        "terms": {
            "category": c
        }
    } if len(c) > 0 else None

# Then use the query_builder in your endpoint.
@app.get("/search")
async def search(
        es: Elasticsearch = Depends(get_elasticsearch),
        query_body: Dict = Depends(query_builder.build())) -> JSONResponse:
    return es.search(
        body=query_body,
        index=index_name
    )

The swagger API will result in:

Category Filter

The resulting query will be like this:

{
  "query": {
    "bool": {
      "filter": [
        {
          "terms": {
            "category": [
              "the-category"
            ]
          }
        }
      ]
    }
  },
  "size": 10,
  "from": 0
}

To use OpenSearch, simply change the client.

from fastapi_elasticsearch import ElasticsearchAPIQueryBuilder
from opensearchpy import OpenSearch

...

@app.get("/search")
async def search(
        os: OpenSearch = Depends(get_opensearch),
        query_body: Dict = Depends(query_builder.build())) -> JSONResponse:
    return os.search(
        body=query_body,
        index=index_name
    )
...

# Create a new query_builder for the endpoint.
query_builder = ElasticsearchAPIQueryBuilder()

To control the scoring use a matcher.

# Or decorate a function as a matcher
# (will contribute to the query scoring).
# Parameters can also be used.
@query_builder.matcher()
def match_fields(q: Optional[str] = Query(None)):
    return {
        "multi_match": {
            "query": q,
            "fuzziness": "AUTO",
            "fields": [
                "name^2",
                "description"
            ]
        }
    } if q is not None else None

The swagger API will result in:

Category Filter

The resulting query will be like this:

{
  "query": {
    "bool": {
      "should": [
        {
          "multi_match": {
            "query": "bob",
            "fuzziness": "AUTO",
            "fields": [
              "name^2",
              "description"
            ]
          }
        }
      ],
      "minimum_should_match": 1
    }
  },
  "size": 10,
  "from": 0
}

To control the ordering, it is possible to annotate a function as sorter.

class Direction(str, Enum):
    asc = "asc"
    desc = "desc"

# Decorate a function as a sorter.
# Parameters can be declared.
@query_builder.sorter()
def sort_by(direction: Optional[Direction] = Query(None)):
    return {
        "name": direction
    } if direction is not None else None

The swagger API will result in:

Category Filter

The resulting query will be like this:

{
  "query": {
    "match_all": {}
  },
  "size": 10,
  "from": 0,
  "sort": [
    {
      "name": "asc"
    }
  ]
}

To add highlight functionality, it is possible to annotate a function as highlighter.

# Decorate a function as a highlighter.
# Parameters can also be declared.
@query_builder.highlighter()
def highlight(q: Optional[str] = Query(None,
                                       description="Query to match the document text."),
              h: bool = Query(False,
                              description="Highlight matched text and inner hits.")):
    return {
        "name": {
            "fragment_size": 256,
            "number_of_fragments": 1
        }
    } if q is not None and h else None

The swagger API will result in:

Category Filter

The resulting query will be like this:

{
  "query": {
    "bool": {
      "should": [
        {
          "multi_match": {
            "query": "bob",
            "fuzziness": "AUTO",
            "fields": [
              "name^2"
            ]
          }
        }
      ],
      "minimum_should_match": 1
    }
  },
  "size": 10,
  "from": 0,
  "highlight": {
    "fields": {
      "name": {
        "fragment_size": 256,
        "number_of_fragments": 1
      }
    }
  }
}

Now, a complete example:

app = FastAPI()

query_builder = ElasticsearchAPIQueryBuilder()


@query_builder.filter()
def filter_items():
    return {
        "term": {
            "join_field": "item"
        }
    }


@query_builder.filter()
def filter_category(c: Optional[List[str]] = Query([],
                                                   description="Category name to filter results.")):
    return {
        "terms": {
            "category": c
        }
    } if len(c) > 0 else None


@query_builder.matcher()
def match_fields(q: Optional[str] = Query(None,
                                          description="Query to match the document text.")):
    return {
        "multi_match": {
            "query": q,
            "fuzziness": "AUTO",
            "fields": [
                "name^2",
            ]
        }
    } if q is not None else None


@query_builder.matcher()
def match_fragments(q: Optional[str] = Query(None,
                                             description="Query to match the document text."),
                    h: bool = Query(False,
                                    description="Highlight matched text and inner hits.")):
    if q is not None:
        matcher = {
            "has_child": {
                "type": "fragment",
                "score_mode": "max",
                "query": {
                    "bool": {
                        "minimum_should_match": 1,
                        "should": [
                            {
                                "match": {
                                    "content": {
                                        "query": q,
                                        "fuzziness": "auto"
                                    }
                                }
                            },
                            {
                                "match_phrase": {
                                    "content": {
                                        "query": q,
                                        "slop": 3,
                                        "boost": 50
                                    }
                                }
                            },
                        ]
                    }
                }
            }
        }
        if h:
            matcher["has_child"]["inner_hits"] = {
                "size": 1,
                "_source": "false",
                "highlight": {
                    "fields": {
                        "content": {
                            "fragment_size": 256,
                            "number_of_fragments": 1
                        }
                    }
                }
            }
        return matcher
    else:
        return None


class Direction(str, Enum):
    asc = "asc"
    desc = "desc"


@query_builder.sorter()
def sort_by(direction: Optional[Direction] = Query(None)):
    return {
        "name": direction
    } if direction is not None else None


@query_builder.highlighter()
def highlight(q: Optional[str] = Query(None,
                                       description="Query to match the document text."),
              h: bool = Query(False,
                              description="Highlight matched text and inner hits.")):
    return {
        "name": {
            "fragment_size": 256,
            "number_of_fragments": 1
        }
    } if q is not None and h else None


@app.get("/search")
async def search(query_body: Dict = Depends(query_builder.build())) -> JSONResponse:
    return es.search(
        body=query_body,
        index=index_name
    )

Also it is possible to customize the generated query body using the decorator search_builder.

from typing import List, Dict

@query_builder.search_builder()
def build_search_body(size: int = 10,
                      start_from: int = 0,
                      source: Union[List, Dict, str] = None,
                      minimum_should_match: int = 1,
                      filters: List[Dict] = [],
                      matchers: List[Dict] = [],
                      highlighters: List[Dict] = [],
                      sorters: List[Dict] = []) -> Dict:
    return {
        "query": {
            ...
        },
        ...
    }

Adopt this project: if you like and want to adopt it, talk to me.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi-elasticsearch-0.7.0.tar.gz (4.8 kB view hashes)

Uploaded Source

Built Distribution

fastapi_elasticsearch-0.7.0-py3-none-any.whl (5.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page