Skip to main content

Dagster integration for Elasticsearch. Resource and IO manager with optional alias rollover.

Project description

dagster-elasticsearch

A Dagster integration for Elasticsearch. Two pieces:

  • ElasticsearchResource is a thin ConfigurableResource over the official elasticsearch Python client.
  • ElasticsearchIOManager bulk-indexes asset outputs as documents, with optional alias rollover for atomic, zero-downtime swaps.

Installation

uv pip install dagster-elasticsearch

# Optional DataFrame / table format support in the IO manager.
uv pip install 'dagster-elasticsearch[pandas]'
uv pip install 'dagster-elasticsearch[polars]'
uv pip install 'dagster-elasticsearch[arrow]'

Picking an Elasticsearch version

elasticsearch-py won't talk to a server with a different major version, so this package is loose-pinned (elasticsearch>=8.10,<11) and leaves the major up to your project. Pin it yourself in pyproject.toml:

# 8.x
dependencies = ["dagster-elasticsearch", "elasticsearch>=8.10,<9"]

# 9.x
dependencies = ["dagster-elasticsearch", "elasticsearch>=9,<10"]

# 10.x (when available)
dependencies = ["dagster-elasticsearch", "elasticsearch>=10,<11"]

The bulk and alias APIs the integration uses haven't changed across 8, 9, and 10.

Resource

from dagster import Definitions, EnvVar, asset
from dagster_elasticsearch import ElasticsearchResource, HostsConfig

@asset
def index_docs(es: ElasticsearchResource) -> None:
    with es.get_client() as client:
        client.index(index="docs", document={"title": "hello"})
        client.indices.refresh(index="docs")

defs = Definitions(
    assets=[index_docs],
    resources={
        "es": ElasticsearchResource(
            connection_config=HostsConfig(
                hosts=["https://es.example.com:9200"],
                api_key=EnvVar("ES_API_KEY"),
            ),
        ),
    },
)

Authentication

HostsConfig and CloudConfig both accept either an api_key or a username + password pair. HostsConfig also takes bearer_auth, ca_certs, and verify_certs.

from dagster_elasticsearch import CloudConfig, HostsConfig

# Elastic Cloud
CloudConfig(cloud_id="my-deployment:...", api_key=EnvVar("ES_API_KEY"))

# Self-hosted, basic auth
HostsConfig(
    hosts=["https://es.internal:9200"],
    username="elastic",
    password=EnvVar("ES_PASSWORD"),
    ca_certs="/etc/ssl/certs/elastic.crt",
)

IO manager

ElasticsearchIOManager takes dict, list[dict], pandas.DataFrame, or Polars DataFrame/LazyFrame outputs and bulk-indexes them via elasticsearch.helpers.bulk. The id_field (default _id) is lifted from each document and used as the Elasticsearch _id.

DataFrame inputs are streamed in bulk_chunk_size-row slices. Polars LazyFrames are collected with the streaming engine. Neither is materialised in full before indexing, so very large parquet-backed assets stay memory-bounded.

from dagster import Definitions, asset
from dagster_elasticsearch import ElasticsearchIOManager, HostsConfig

@asset
def docs() -> list[dict]:
    return [{"_id": "1", "title": "hello"}]

defs = Definitions(
    assets=[docs],
    resources={
        "io_manager": ElasticsearchIOManager(
            connection_config=HostsConfig(hosts=["http://localhost:9200"]),
            index="docs",
        ),
    },
)

Alias rollover

With use_alias=True, every materialisation writes to a fresh physical index, then atomically swaps a stable alias to the new index via _aliases. Readers and downstream assets always see a consistent view via the alias name.

ElasticsearchIOManager(
    connection_config=HostsConfig(hosts=["http://localhost:9200"]),
    index="docs",          # alias name; physical indices are docs-<suffix>
    use_alias=True,
    rollover_strategy="auto",  # partition key for partitioned assets, else timestamp
    keep_last=3,           # delete older rollover indices beyond N
)
Strategy Suffix Notes
auto partition if partitioned, else timestamp Default.
timestamp YYYYMMDDtHHMMSSzMICROS Microsecond precision avoids collisions.
run_id Dagster run id Opaque but unique per run.
partition Slugified partition key Errors if the asset isn't partitioned.
none (empty) Alias swap is a no-op. Mostly for tests.

Partitioned assets (without alias)

When use_alias=False and the asset is partitioned, each partition writes to its own index {index}-{partition_key}. Reads target the same per-partition index.

Note on multi-partition loads. ElasticsearchIOManager is a plain IOManager, not a UPathIOManager. Loading inputs that span multiple partitions (e.g. a downstream asset depending on every partition of an upstream) calls load_input once per partition. Fine for tens of partitions; for hundreds-or-thousands you may want a different read strategy (one ES query against the alias, for instance, or a dedicated downstream asset that reads via the resource and not the IO manager).

Per-asset overrides

Most IO manager fields can be overridden per-asset via definition_metadata (or output_metadata for per-output overrides). Resolution order: output_metadata > definition_metadata > resource-level config.

@asset(metadata={
    "index": "search-docs",
    "bulk_chunk_size": 100,
    "rollover_strategy": "run_id",
})
def docs() -> list[dict]:
    ...

Supported keys: index, id_field, bulk_chunk_size, max_chunk_bytes, refresh, rollover_strategy, index_config.

Lazy reads

For very large source indices, set lazy_load=True so load_input returns an iterator that streams hits via the scroll API instead of building a list[dict]:

ElasticsearchIOManager(
    connection_config=HostsConfig(hosts=["http://localhost:9200"]),
    index="docs",
    lazy_load=True,
    scan_size=1000,           # page size
    scroll_keep_alive="5m",   # scroll context lifetime
)

Output metadata

Each materialisation records the following on the asset:

Key Type When
index text Always. Physical index that was written to.
indexed int Always. Number of documents indexed.
failures int When fail_fast=False and some docs failed.
alias text When use_alias=True. The stable alias name.

Index mappings and settings

Pass index_config to control mappings and shard/replica settings on indices the IO manager creates (alias rollover only):

from dagster_elasticsearch import ElasticsearchIndexConfig, ElasticsearchIOManager, HostsConfig

ElasticsearchIOManager(
    connection_config=HostsConfig(hosts=["http://localhost:9200"]),
    index="docs",
    use_alias=True,
    index_config=ElasticsearchIndexConfig(
        mappings={
            "properties": {
                "title": {"type": "text"},
                "tags": {"type": "keyword"},
            }
        },
        settings={"number_of_shards": 3, "number_of_replicas": 1},
    ),
)

Polars / Parquet handoff

If an upstream asset uses dagster-polars's PolarsParquetIOManager and a downstream asset writes to Elasticsearch, the downstream can take a LazyFrame straight from the upstream parquet:

import dagster as dg
import polars as pl

@dg.asset(io_manager_key="parquet_io_manager")
def raw_records() -> pl.DataFrame:
    return pl.DataFrame([...])

@dg.asset(io_manager_key="es_io_manager")
def search_records(raw_records: pl.LazyFrame) -> pl.LazyFrame:
    return raw_records.filter(pl.col("active"))

The IO manager collects the LazyFrame with Polars's streaming engine and yields bulk_chunk_size-row slices into the Elasticsearch bulk helper, so the full result never has to fit in RAM.

Error handling

Per-document indexing errors raise ElasticsearchBulkIndexError. The underlying error list is on .errors:

from dagster_elasticsearch import ElasticsearchBulkIndexError

try:
    materialize([docs], resources={...})
except ElasticsearchBulkIndexError as e:
    for err in e.errors:
        print(err)

Set fail_fast=False on the IO manager to log per-document errors instead of aborting the materialisation.

Asset checks

Every materialisation records output metadata (index, indexed, failures, alias) on the asset, so an asset check can read it back and fail the run when something looks off (zero docs indexed, too many failures, and so on).

The integration ships a one-liner helper for the most common case:

from dagster import Definitions, asset
from dagster_elasticsearch import ElasticsearchIOManager, build_indexed_asset_check


@asset(io_manager_key="es_io_manager")
def search_docs() -> list[dict]:
    return [{"_id": "1", "title": "hello"}, {"_id": "2", "title": "world"}]


defs = Definitions(
    assets=[search_docs],
    asset_checks=[
        build_indexed_asset_check(asset=search_docs, min_indexed=1, max_failures=0),
    ],
    resources={"es_io_manager": ElasticsearchIOManager(...)},
)

If you need something more bespoke (compare against an upstream row count, alert at warning severity, etc.), write the check by hand:

from dagster import (
    AssetCheckResult,
    AssetCheckSeverity,
    asset,
    asset_check,
)
from dagster_elasticsearch import ElasticsearchIOManager


@asset(io_manager_key="es_io_manager")
def search_docs() -> list[dict]:
    return [{"_id": "1", "title": "hello"}, {"_id": "2", "title": "world"}]


@asset_check(asset=search_docs)
def at_least_one_indexed(context) -> AssetCheckResult:
    record = context.instance.get_latest_materialization_event(search_docs.key)
    metadata = record.asset_materialization.metadata if record else {}
    indexed = metadata.get("indexed").value if metadata.get("indexed") else 0
    failures = metadata.get("failures").value if metadata.get("failures") else 0

    return AssetCheckResult(
        passed=indexed > 0 and failures == 0,
        severity=AssetCheckSeverity.ERROR,
        metadata={"indexed": indexed, "failures": failures},
    )

Troubleshooting

Cluster status red and shards stuck unassigned in local Docker. Elasticsearch refuses to allocate shards once the host disk is over the high-watermark threshold (default 90%). Either free up space, or (for local development only) disable the threshold:

curl -XPUT http://localhost:9200/_cluster/settings \
  -H 'Content-Type: application/json' \
  -d '{"persistent":{"cluster.routing.allocation.disk.threshold_enabled":"false"}}'

elasticsearch.BadRequestError: invalid_index_name_exception ... must be lowercase. Index names have to be lowercase. The IO manager lowercases timestamp suffixes itself, but if you supply your own index name make sure it is lowercase too.

Development

make test    # requires Docker for the Elasticsearch testcontainer
make ruff
make check

Set ES_URL to reuse a running cluster instead of spinning up a fresh container per test session:

ES_URL=http://localhost:9200 uv run pytest

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dagster_elasticsearch-0.0.2.tar.gz (17.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dagster_elasticsearch-0.0.2-py3-none-any.whl (16.7 kB view details)

Uploaded Python 3

File details

Details for the file dagster_elasticsearch-0.0.2.tar.gz.

File metadata

  • Download URL: dagster_elasticsearch-0.0.2.tar.gz
  • Upload date:
  • Size: 17.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dagster_elasticsearch-0.0.2.tar.gz
Algorithm Hash digest
SHA256 530c53ca41c22ddcad8f28f21c8f4a03bd5887e2fb3443688df0af2bf97bab69
MD5 9973e5663501ac8dbd0d66093e46c2ec
BLAKE2b-256 6ae542b51a06a82a2b6242624154a57fd432aff762f362e9dec58641da0a9037

See more details on using hashes here.

File details

Details for the file dagster_elasticsearch-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: dagster_elasticsearch-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 16.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.16 {"installer":{"name":"uv","version":"0.11.16","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dagster_elasticsearch-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 2fc981f5e04bd0bdd5d0e2d58f82d0cd31df6cea6193c4fff037969a0a0b485f
MD5 56032664f02a657326ff0833f010bd2b
BLAKE2b-256 e6b456c28e98d3d118aac5a7fb007077d45811b58612c077491d66bcbf6c051b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page