Skip to main content

Async multi-data-source connectors (S3, OpenSearch, Athena, MySQL, SQL Server, local files) with AI hooks.

Project description

multids

Docs Status CI PyPI GitHub

Async multi-data-source connectors for S3, OpenSearch, Athena, MySQL, SQL Server and local files. Provides async read/write primitives and pluggable AI hooks.

Quick start

  1. Create a virtualenv and install:
python -m venv .venv
.\.venv\Scripts\Activate.ps1
pip install multids[ai]
  1. See examples/basic_usage.py for a quick demonstration.

JSON Support

LocalConnector and S3Connector include helper methods for reading and writing JSON with automatic Unicode support ( no escaping of characters like 你好).

Local JSON

from multids.connectors.local import LocalConnector

local = LocalConnector()
data = {"message": "Hello", "lang": "你好"}

# Write JSON (un-escaped unicode)
await local.write_json(data, "path/to/data.json")

# Read JSON
result = await local.read_json("path/to/data.json")
print(result)

S3 JSON

from multids.connectors.s3 import S3Connector

s3 = S3Connector(aws_region="eu-central-1")
data = {"status": "ok", "info": "مرحبا"}

# Upload JSON object
await s3.write_json(data, bucket="my-bucket", key="status.json")

# Download and parse JSON
result = await s3.read_json(bucket="my-bucket", key="status.json")
print(result)

S3 multipart example

python examples/s3_multipart_example.py

MySQL bulk insert example

python examples/mysql_bulk_example.py

S3 upload behavior and options

  • min_multipart_upload_size (default: 5 MiB): overall object size threshold before the connector switches from a single PUT to a multipart upload. The connector buffers the incoming stream and only creates a multipart upload when the accumulated bytes reach this threshold. This is useful when you're uploading many small JSON files — they will use a single put_object unless you exceed the threshold.
  • part_size (default: 8 MiB): size of each multipart part once multipart is created.
  • enforce_min_part_size (default: False in this library): when enabled, the connector will enforce the AWS minimum part size of 5 MiB for part_size (prevents creating parts smaller than AWS requires). For typical workloads of many small files, the default False makes the connector flexible.
  • force_multipart (write-time flag, default False): pass force_multipart=True to S3Connector.write_stream(...)to force multipart upload even if the total size is below min_multipart_upload_size. Use this when you need multipart semantics (e.g., resumable uploads across process restarts) for small objects.

Examples

  • Force multipart for a small object (useful for resuming):
await s3.write_stream(my_small_stream(), bucket="b", key="k", force_multipart=True)
  • Normal (default) path: small objects use a single put_object and large objects use multipart as needed.

Resumable uploads & checkpoints

The S3 connector supports simple checkpointing to make multipart uploads resumable across process restarts. When you provide a checkpoint_path to write_stream, the connector writes a small JSON file containing the upload metadata as parts complete. If a failure or process stop occurs you can call write_stream(..., resume=True, checkpoint_path=...) to attempt a resume.

Checkpoint format (JSON):

{
  "bucket": "my-bucket",
  "key": "path/to/object",
  "upload_id": "<aws-upload-id>",
  "parts": [
    {
      "PartNumber": 1,
      "ETag": "...",
      "size": 5242880
    },
    {
      "PartNumber": 2,
      "ETag": "...",
      "size": 5242880
    }
  ]
}

How it works:

  • While uploading parts, the connector will save the checkpoint file (if checkpoint_path is provided) after each part completes. The checkpoint includes the upload_id and the parts uploaded so far.
  • If an upload is interrupted, re-running write_stream with resume=True and the same checkpoint_path will attempt to list existing parts from S3 using the upload_id and continue uploading remaining parts. If listing parts fails, the connector will fall back to the parts recorded in the checkpoint file.
  • On successful completion the checkpoint file is removed.

Resume example (simplified):

async def upload_with_resume():
    conn = S3Connector(part_size=5 * 1024 * 1024)
    chk = "/tmp/uploads/my-object.chk"

    async def small_stream():
        # imagine this yields many small chunks and we want resumable semantics
        for _ in range(100):
            yield b"{" + b' ' * 1024 + b"}\n"

    # First attempt: force multipart so we have a resumable upload
    await conn.write_stream(
        small_stream(),
        bucket="my-bucket",
        key="my-object.json",
        checkpoint_path=chk,
        force_multipart=True,
    )

    # If the process were interrupted before completion, re-run and resume:
    await conn.write_stream(
        small_stream(), bucket="my-bucket", key="my-object.json", checkpoint_path=chk, resume=True
    )

    await conn.close()


asyncio.run(upload_with_resume())

SQL Server connector

The project includes an async MSSQLConnector implemented with aioodbc (ODBC). Notes:

  • System requirement: install a SQL Server ODBC driver on your host (e.g. Microsoft ODBC Driver for SQL Server).
  • Install the optional sqlserver extras to get the Python runtime dependency:
pip install multids[sqlserver]

Quick example (using a pool):

from multids.connectors.mssql import MSSQLConnector


async def example():
    conn = MSSQLConnector()
    # either set connection options on the connector and call connect_pool(), or
    # configure a DSN and call connect_pool()
    await conn.connect_pool()
    rows = await conn.fetch_rows("SELECT id, name FROM users")
    await conn.close()

OpenSearch connector

OpenSearchConnector is an async, httpx-based helper for indexing and searching documents in OpenSearch/Elasticsearch.

Examples

from multids.connectors.opensearch import OpenSearchConnector

oc = OpenSearchConnector("http://localhost:9200", api_key="<api-key>")

# Index a single document
await oc.index_doc("my-index", {"name": "alice"})

# Bulk index (sync iterable)
docs = [{"id": 1, "name": "a"}, {"id": 2, "name": "b"}]
await oc.bulk_index("my-index", docs, chunk_size=100)

# Bulk index with id field + routing field
await oc.bulk_index("my-index", docs, chunk_size=100)

# Search / scroll
res = await oc.search("my-index", {"query": {"match_all": {}}}, size=10)
async for hit in oc.scroll("my-index", {"query": {"match_all": {}}}):
  print(hit)

await oc.close()

Advanced OpenSearch examples

Indexing with explicit IDs and routing:

docs = [
  {"id": "user-1", "name": "alice", "org": "org1"},
  {"id": "user-2", "name": "bob", "org": "org2"},
]
await oc.bulk_index("my-index", docs, chunk_size=100)

# If you want to use a field as _id and also set routing based on another field:
await oc.bulk_index("my-index", docs, chunk_size=100)

# Or use build_bulk_ndjson directly with id_field/routing_field support
async for chunk in OpenSearchConnector.build_bulk_ndjson(docs, index="my-index", id_field="id", routing_field="org"):
  await oc._request("POST", "/_bulk", content=chunk)

Authentication examples

API key (preferred for service-to-service):

oc = OpenSearchConnector("https://es.example.com", api_key="BASE64_APIKEY")
await oc.index_doc("idx", {"name": "s1"})

HTTP Basic (username/password):

oc = OpenSearchConnector("https://es.example.com", basic_auth=("user", "pass"))
await oc.search("idx", {"query": {"match_all": {}}})

Installation note

If you only need the OpenSearch functionality (and want to avoid installing httpx by default), install the optional extra:

pip install multids[opensearch]

Integration tests

We include an optional integration test suite which can run against a real OpenSearch instance. In CI, we start a local OpenSearch service and run tests under tests/integration/. Locally you can run integration tests by setting OPENSEARCH_URL to your instance and running:

$env:OPENSEARCH_URL = 'http://localhost:9200'
pytest tests/integration -q

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

multids-0.1.1.tar.gz (24.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

multids-0.1.1-py3-none-any.whl (28.4 kB view details)

Uploaded Python 3

File details

Details for the file multids-0.1.1.tar.gz.

File metadata

  • Download URL: multids-0.1.1.tar.gz
  • Upload date:
  • Size: 24.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.14 Linux/6.11.0-1018-azure

File hashes

Hashes for multids-0.1.1.tar.gz
Algorithm Hash digest
SHA256 28a39e684220e2b0e8cbd31c01c65623fba9a717c89cee025e034484fbdf5e70
MD5 1dc3b07a6d3a6e628e27ce23d20794a1
BLAKE2b-256 f669ae393c3777304f5667e6b588439ca8ea3260dd604a668422b0833c154ce1

See more details on using hashes here.

File details

Details for the file multids-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: multids-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 28.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.14 Linux/6.11.0-1018-azure

File hashes

Hashes for multids-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 fb9b3620238cb47a1dd8b4e13009699a091008e5d4969af178d7cf8fe27bcd2e
MD5 9843fa9e97b5a9c5885bee72932a2cca
BLAKE2b-256 e9ebf1e14ff36f1f67907b7708625ec25703b19fac94f1c3ecb388068c494ce3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page