Skip to main content

Unified, type-safe access to web archive APIs.

Project description

PyPi CI Code coverage Python Issues Commit activity Downloads License

☀️ ray-elasticsearch

Ray data source and sink for Elasticsearch.

Use this minimal library if you plan to read or write data from/to Elasticsearch massively parallel for data processing in Ray. Internally, the library uses parallelized sliced point-in-time search for reading and parallelized bulk requests for writing data, the two most efficient ways to read/write data to/from Elasticsearch. Note, that this library does not guarantee any specific ordering of the results, though, the scores are returned.

Installation

Install the package from PyPI:

pip install ray-elasticsearch

Usage

This library makes use of Ray's Datasource and Datasink APIs. For reading, use ElasticsearchDatasource and, for writing, use ElasticsearchDatasink.

Read documents

You can read results from a specified index by using an ElasticsearchDatasource with Ray's read_datasource() like so:

from ray import init
from ray.data import read_datasource
from ray_elasticsearch import ElasticsearchDatasource

init()
source = ElasticsearchDatasource(index="test")
res = read_datasource(source)\
    .map(lambda x: x["_source"])\
    .sum("id")
print(f"Read complete. Sum: {res}")

Use an Elasticsearch query to filter the results:

source = ElasticsearchDatasource(
    index="test",
    query={
        "match": {
            "text": "foo bar",
        },
    },
)

Note that the parallel read does not enforce any ordering of the results even though the results are scored by Elasticsearch.

Normally, it is not necessary to specify a fixed concurrency level. The data source will automatically determine the optimal concurrency based on the disk size of the Elasticsearch index and the Ray cluster capabilities. You can, however, override the concurrency by setting the concurrency parameter in Ray's read_datasource().

Write documents

Writing documents works similarly by using the ElasticsearchDatasink with Ray's write_datasink():

from ray import init
from ray.data import range
from ray_elasticsearch import ElasticsearchDatasink

init()
sink = ElasticsearchDatasink(index="test")
range(10_000)\
    .map(lambda x: {"_source": x})\
    .write_datasink(sink)
print("Write complete.")

Concurrency can again be limited by specifying the concurrency parameter in Ray's write_datasink().

Elasticsearch connection

Per default, the data source and sink access Elasticsearch on localhost:9200. However, in most cases, you would instead want to continue to some remote Elasticsearch instance. To do so, specify the client like in the example below, and use the same parameters as in the Elasticsearch() constructor:

source = ElasticsearchDatasource(
    index="test",
    client_kwargs=dict(
        hosts="<HOST>",
        http_auth=("<USERNAME>", "<PASSWORD>"),
        max_retries=10,
    ),
)

For the full list of allowed arguments in the client_kwargs dictionary, refer to the documentation of the Elasticsearch() constructor.

Elasticsearch DSL

To simplify query construction, you can also use the Elasticsearch DSL and its corresponding data source (ElasticsearchDslDatasource) and sink (ElasticsearchDslDatasink):

from elasticsearch7_dsl import Document
from elasticsearch7_dsl.query import Exists
from ray_elasticsearch import ElasticsearchDslDatasource, ElasticsearchDslDatasink

class Foo(Document):
    class Index:
        name = "test_foo"
    text: str = Text()

source = ElasticsearchDslDatasource(
    index=Foo,
    query=Exists(field="doi"),
)
sink = ElasticsearchDslDatasink(index=Foo)

Note that, unlike in Elasticsearch DSL, the results are not parsed as Python objects but instead remain Python dictionaries, due to Ray internally transforming everything in Arrow format.

Examples

More examples can be found in the examples directory.

Development

To build this package and contribute to its development you need to install the build, setuptools and wheel packages:

pip install build setuptools wheel

(On most systems, these packages are already pre-installed.)

Development installation

Install package and test dependencies:

pip install -e .[tests]

Testing

Verify your changes against the test suite to verify.

ruff check .  # Code format and LINT
mypy .        # Static typing
bandit -c pyproject.toml -r .  # Security
pytest .      # Unit tests

Please also add tests for your newly developed code.

Build wheels

Wheels for this package can be built with:

python -m build

Support

If you have any problems using this package, please file an issue. We're happy to help!

License

This repository is released under the MIT license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ray_elasticsearch-0.2.4.tar.gz (13.6 kB view details)

Uploaded Source

Built Distribution

ray_elasticsearch-0.2.4-py3-none-any.whl (7.7 kB view details)

Uploaded Python 3

File details

Details for the file ray_elasticsearch-0.2.4.tar.gz.

File metadata

  • Download URL: ray_elasticsearch-0.2.4.tar.gz
  • Upload date:
  • Size: 13.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for ray_elasticsearch-0.2.4.tar.gz
Algorithm Hash digest
SHA256 1f42954151532435bb7f12a8ff458d1041ac2da2e5ea52d1ab30c9d2d941f14e
MD5 d15e3e9bccaf9571b40e1ad81693a5e8
BLAKE2b-256 7757c63fe81a27fa8fdeb84e8ab70b00f334b9198ab3deefa9d5961b4dbd6ae3

See more details on using hashes here.

File details

Details for the file ray_elasticsearch-0.2.4-py3-none-any.whl.

File metadata

File hashes

Hashes for ray_elasticsearch-0.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 6037821b0880c11b7ed93e3c7e0ef3b03eca93fc8e2895a43a50e99163be7577
MD5 a4d6500fa0438be23e7831ef98294fd7
BLAKE2b-256 0391648ddb8243d14d10e6ddefabd931056c69ff02f41b17a0dbb1e76c13c7b5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page