Ray data source and sink for Elasticsearch.
Project description
☀️ ray-elasticsearch
Ray data source and sink for Elasticsearch.
Use this minimal library if you plan to read or write data from/to Elasticsearch massively parallel for data processing in Ray. Internally, the library uses parallelized sliced point-in-time search for reading and parallelized bulk requests for writing data, the two most efficient ways to read/write data to/from Elasticsearch. Note, that this library does not guarantee any specific ordering of the results, though, the scores are returned.
Installation
Install the package from PyPI:
pip install ray-elasticsearch
Usage
This library makes use of Ray's Datasource and Datasink APIs.
For reading, use ElasticsearchDatasource and, for writing, use ElasticsearchDatasink.
Read documents
You can read results from a specified index by using an ElasticsearchDatasource with Ray's read_datasource(). Considering you have an index named test that stores some numeric value in the value field, you can efficiently compute the sum of all values like so:
from ray import init
from ray.data import read_datasource
from ray_elasticsearch import ElasticsearchDatasource
init()
source = ElasticsearchDatasource(index="test")
res = read_datasource(source)\
.sum("value")
print(f"Read complete. Sum: {res}")
Use an Elasticsearch query to filter the results:
source = ElasticsearchDatasource(
index="test",
query={
"match": {
"text": "foo bar",
},
},
)
Note that the parallel read does not enforce any ordering of the results even though the results are scored by Elasticsearch.
With the default settings, you can still access the retrieved score from the Ray Dataset's _score column.
You do not need to set a fixed maximum concurrency level. But it can often be a good idea to limit concurrency (and hence, simultaneous requests to the Elasticsearch cluster) by setting the override_num_blocks parameter in Ray's read_datasource():
source = ElasticsearchDatasource(index="test")
ds = read_datasource(source, override_num_blocks=100)
The override_num_blocks parameter will determine the number of slices for the sliced point-in-time request. In typical scenarios, this number should not be much larger than 1000. Even with hundreds or thousands of slices, you can still limit how many requests are sent to the Elasticsearch cluster in parallel with Ray's concurrency parameter:
source = ElasticsearchDatasource(index="test")
ds = read_datasource(source, override_num_blocks=1000, concurrency=100)
Normally, it suffices to just set override_num_blocks reasonably small, e.g., to 100 or to the number of Elasticsearch data nodes in the cluster, and to keep the concurrency unchanged.
Write documents
Writing documents works similarly by using the ElasticsearchDatasink with Ray's write_datasink():
from ray import init
from ray.data import range
from ray_elasticsearch import ElasticsearchDatasink
init()
sink = ElasticsearchDatasink(index="test")
range(10_000) \
.rename_columns({"id": "value"}) \
.write_datasink(sink)
print("Write complete.")
Write concurrency can be limited by specifying the concurrency parameter in Ray's write_datasink().
It is advisable to keep the concurrency at or below the number of data nodes in the Elasticsearch cluster, e.g., at 100.
Elasticsearch connection and authentication
Per default, the data source and sink access Elasticsearch on localhost:9200, the default of the elasticsearch Python library.
However, in most cases, you would instead want to continue to some remote Elasticsearch instance.
To do so, specify the client like in the example below, and use the same parameters as in the Elasticsearch() constructor:
source = ElasticsearchDatasource(
index="test",
hosts="<HOST>",
http_auth=("<USERNAME>", "<PASSWORD>"),
max_retries=10,
)
All client related keyword arguments to the ElasticsearchDatasource or ElasticsearchDatasink are passed on as is to the Elasticsearch() constructor. Refer to the documentation for an overview of the supported connection settings.
Data schema auto-guessing
The ElasticsearchDatasource will internally get the mapping for the given index from Elasticsearch, and guess the PyArrow data schema based on Elasticsearch's mapping field types.
Elasticsearch DSL
This library integrates well with the Elasticsearch DSL library, to simplify building queries, to derive a more accurate data schema from a Document class, or to simplify transforming data from Elasticsearch data sources in Ray.
Query DSL
To simplify query construction, just use any of the query classes from the Elasticsearch DSL library:
from elasticsearch_dsl.query import Exists
from ray_elasticsearch import ElasticsearchDatasource
source = ElasticsearchDatasource(
index="foo",
query=Exists(field="doi"),
)
All usual operators to combine queries are supported.
Document mapping and index
This library can also improve the schema auto-guessing capabilities, by using an Elasticsearch DSL Document class:
from elasticsearch_dsl import Document
from ray_elasticsearch import ElasticsearchDatasource
class Foo(Document):
text = Text(required=True)
class Index:
name = "test_foo"
source = ElasticsearchDatasource(index=Foo)
Most importantly, this will make the schema reflect the required and multi properties of the Document's fields to set PyArrow's nullable argument and/or wrap schema field types as lists.
Note that, the rows returned by an ElasticsearchDatasource, even if using a Document class as index or schema, will still be dictionaries. Due to the way Ray stores the data internally (in PyArrow format), we cannot directly return instances of the given Document class. Use the provided function decorators to still easily transform the data.
Simplified data transformations
Two function decorators are provided that help you with transforming the data from an ElasticsearchDatasource:
@unwrap_document(Foo)
def add_custom_field(row: dict[str, Any], document: Foo) -> dict[str, Any]:
return {**row, "custom": document.text}
ds = ds.map(add_custom_field)
Or to map batches of data:
@unwrap_documents(Foo)
def add_custom_field_batch(batch: DataFrame, documents: Sequence[Foo]) -> DataFrame:
batch["custom"] = [document.text for document in documents]
return batch
ds = ds.map_batches(add_custom_field_batch)
Elasticsearch Pydantic
Instead of the standard Elasticsearch DSL Document class, you can also use the BaseDocument class from the elasticsearch-pydantic library, to add Pydantic validation and type-checking to your Elasticsearch models. As that library is fully compatible with Elasticsearch DSL, its model classes can be used as a drop-in replacement and still support the more accurate data schema guessing (from a BaseDocument class), or simplified data transformations.
These features are included in our test suite to regularly check compatibility of both libraries.
Selecting source and meta fields
In Elasticsearch, any document returned from a search request keeps the actual data nested in the _source field, and has some metadata (e.g., _id and _index) on the top level. However, working with nested columns is tricky with Ray (e.g., nested columns cannot be renamed). The ray-elasticsearch library automatically unwraps the source field. For example, consider the following Elasticsearch record:
{
"_index": "test",
"_type": "_doc",
"_id": "1",
"_score": null,
"_source": {
"value": 1
}
}
Using the default settings, the corresponding row in the Ray dataset will look like this:
{
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_score" : None,
"value" : 1
}
You can also select the source and metadata fields explicitly, using the source_fields and meta_fields arguments:
source = ElasticsearchDatasource(
index="test",
source_fields=["value"],
meta_fields=["id"],
)
With the above setting, just the ID and value will be stored in the Ray Dataset's blocks:
{
"_id" : "1",
"value" : 1
}
Examples
More examples can be found in the examples directory.
Compatibility
This library works fine with any of the following Pip packages installed:
elasticsearchelasticsearch7elasticsearch8elasticsearch-dsl<8.12.0elasticsearch7-dslelasticsearch8-dsl<8.12.0elasticsearch-pydantic
The ray-elasticsearch library will automatically detect if the Elasticsearch DSL or Pydantic helpers are installed, and add support for additional features accordingly.
Development
To build this package and contribute to its development you need to install the build, setuptools and wheel packages:
pip install build setuptools wheel
(On most systems, these packages are already pre-installed.)
Development installation
Install package and test dependencies:
pip install -e .[tests,tests-es7] # For elasticsearch~=7.0
pip install -e .[tests,tests-es7-major] # For elasticsearch7
pip install -e .[tests,tests-es8] # For elasticsearch~=8.0
pip install -e .[tests,tests-es8-major] # For elasticsearch8
pip install -e .[tests,tests-es7-dsl] # For elasticsearch-dsl~=7.0
pip install -e .[tests,tests-es7-dsl-major] # For elasticsearch7-dsl
pip install -e .[tests,tests-es8-dsl] # For elasticsearch-dsl~=8.0
pip install -e .[tests,tests-es8-dsl-major] # For elasticsearch8-dsl
pip install -e .[tests,tests-es7-pydantic] # For elasticsearch-pydantic and elasticsearch-dsl~=7.0
pip install -e .[tests,tests-es7-pydantic-major] # For elasticsearch-pydantic and elasticsearch7-dsl
pip install -e .[tests,tests-es8-pydantic] # For elasticsearch-pydantic and elasticsearch-dsl~=8.0
pip install -e .[tests,tests-es8-pydantic-major] # For elasticsearch-pydantic and elasticsearch8-dsl
Testing
Verify your changes against the test suite to verify.
ruff check . # Code format and LINT
mypy . # Static typing
pytest . # Unit tests
Please also add tests for your newly developed code.
Build wheels
Wheels for this package can be built with:
python -m build
Support
If you have any problems using this package, please file an issue. We're happy to help!
License
This repository is released under the MIT license.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ray_elasticsearch-1.1.4.tar.gz.
File metadata
- Download URL: ray_elasticsearch-1.1.4.tar.gz
- Upload date:
- Size: 24.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c478667f84a0a9280235b83050e6c352802a03efa002af26d9404ddc74d9ba17
|
|
| MD5 |
62d54d56607f546997ec1ba89fae694b
|
|
| BLAKE2b-256 |
5cffb1a6864139c2bfe90027d1dd93cd3134739cf4147a76e1dbfbf0e86ded05
|
Provenance
The following attestation bundles were made for ray_elasticsearch-1.1.4.tar.gz:
Publisher:
ci.yml on janheinrichmerker/ray-elasticsearch
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ray_elasticsearch-1.1.4.tar.gz -
Subject digest:
c478667f84a0a9280235b83050e6c352802a03efa002af26d9404ddc74d9ba17 - Sigstore transparency entry: 625078565
- Sigstore integration time:
-
Permalink:
janheinrichmerker/ray-elasticsearch@a316554b59fa26632074c330510ef816b808ef45 -
Branch / Tag:
refs/tags/1.1.4 - Owner: https://github.com/janheinrichmerker
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@a316554b59fa26632074c330510ef816b808ef45 -
Trigger Event:
push
-
Statement type:
File details
Details for the file ray_elasticsearch-1.1.4-py3-none-any.whl.
File metadata
- Download URL: ray_elasticsearch-1.1.4-py3-none-any.whl
- Upload date:
- Size: 18.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
42bc1bdae5a36fd59e3d5a4799ad80769b36b58699d8a6d1061e1152562fa245
|
|
| MD5 |
926984292d610b8f2cd29ab886699780
|
|
| BLAKE2b-256 |
ff8a79b2f748c66ae7b4ef0999a20c0b04230b388334b3bcd9fa392def5d3962
|
Provenance
The following attestation bundles were made for ray_elasticsearch-1.1.4-py3-none-any.whl:
Publisher:
ci.yml on janheinrichmerker/ray-elasticsearch
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ray_elasticsearch-1.1.4-py3-none-any.whl -
Subject digest:
42bc1bdae5a36fd59e3d5a4799ad80769b36b58699d8a6d1061e1152562fa245 - Sigstore transparency entry: 625078605
- Sigstore integration time:
-
Permalink:
janheinrichmerker/ray-elasticsearch@a316554b59fa26632074c330510ef816b808ef45 -
Branch / Tag:
refs/tags/1.1.4 - Owner: https://github.com/janheinrichmerker
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
ci.yml@a316554b59fa26632074c330510ef816b808ef45 -
Trigger Event:
push
-
Statement type: