Skip to main content

Async multi-data-source connectors (S3, OpenSearch, Athena, MySQL, SQL Server, local files) with AI hooks.

Project description

multids

Docs Status

Async multi-data-source connectors for S3, OpenSearch, Athena, MySQL, SQL Server and local files. Provides async read/write primitives and pluggable AI hooks.

Quick start

  1. Create a virtualenv and install:
python -m venv .venv
.\.venv\Scripts\Activate.ps1
pip install -e .[ai]
  1. See examples/basic_usage.py for a quick demonstration.

JSON Support

LocalConnector and S3Connector include helper methods for reading and writing JSON with automatic Unicode support ( no escaping of characters like 你好).

Local JSON

from multids.connectors.local import LocalConnector

local = LocalConnector()
data = {"message": "Hello", "lang": "你好"}

# Write JSON (un-escaped unicode)
await local.write_json(data, "path/to/data.json")

# Read JSON
result = await local.read_json("path/to/data.json")
print(result)

S3 JSON

from multids.connectors.s3 import S3Connector

s3 = S3Connector(aws_region="us-east-1")
data = {"status": "ok", "info": "مرحبا"}

# Upload JSON object
await s3.write_json(data, bucket="my-bucket", key="status.json")

# Download and parse JSON
result = await s3.read_json(bucket="my-bucket", key="status.json")
print(result)

S3 multipart example

python examples/s3_multipart_example.py

MySQL bulk insert example

python examples/mysql_bulk_example.py

S3 upload behavior and options

  • min_multipart_upload_size (default: 5 MiB): overall object size threshold before the connector switches from a single PUT to a multipart upload. The connector buffers the incoming stream and only creates a multipart upload when the accumulated bytes reach this threshold. This is useful when you're uploading many small JSON files — they will use a single put_object unless you exceed the threshold.
  • part_size (default: 8 MiB): size of each multipart part once multipart is created.
  • enforce_min_part_size (default: False in this library): when enabled, the connector will enforce the AWS minimum part size of 5 MiB for part_size (prevents creating parts smaller than AWS requires). For typical workloads of many small files, the default False makes the connector flexible.
  • force_multipart (write-time flag, default False): pass force_multipart=True to S3Connector.write_stream(...)to force multipart upload even if the total size is below min_multipart_upload_size. Use this when you need multipart semantics (e.g., resumable uploads across process restarts) for small objects.

Examples

  • Force multipart for a small object (useful for resuming):
await s3.write_stream(my_small_stream(), bucket="b", key="k", force_multipart=True)
  • Normal (default) path: small objects use a single put_object and large objects use multipart as needed.

Resumable uploads & checkpoints

The S3 connector supports simple checkpointing to make multipart uploads resumable across process restarts. When you provide a checkpoint_path to write_stream, the connector writes a small JSON file containing the upload metadata as parts complete. If a failure or process stop occurs you can call write_stream(..., resume=True, checkpoint_path=...) to attempt a resume.

Checkpoint format (JSON):

{
  "bucket": "my-bucket",
  "key": "path/to/object",
  "upload_id": "<aws-upload-id>",
  "parts": [
    {
      "PartNumber": 1,
      "ETag": "...",
      "size": 5242880
    },
    {
      "PartNumber": 2,
      "ETag": "...",
      "size": 5242880
    }
  ]
}

How it works:

  • While uploading parts, the connector will save the checkpoint file (if checkpoint_path is provided) after each part completes. The checkpoint includes the upload_id and the parts uploaded so far.
  • If an upload is interrupted, re-running write_stream with resume=True and the same checkpoint_path will attempt to list existing parts from S3 using the upload_id and continue uploading remaining parts. If listing parts fails, the connector will fall back to the parts recorded in the checkpoint file.
  • On successful completion the checkpoint file is removed.

Resume example (simplified):

async def upload_with_resume():
    conn = S3Connector(part_size=5 * 1024 * 1024)
    chk = "/tmp/uploads/my-object.chk"

    async def small_stream():
        # imagine this yields many small chunks and we want resumable semantics
        for _ in range(100):
            yield b"{" + b' ' * 1024 + b"}\n"

    # First attempt: force multipart so we have a resumable upload
    await conn.write_stream(
        small_stream(),
        bucket="my-bucket",
        key="my-object.json",
        checkpoint_path=chk,
        force_multipart=True,
    )

    # If the process were interrupted before completion, re-run and resume:
    await conn.write_stream(
        small_stream(), bucket="my-bucket", key="my-object.json", checkpoint_path=chk, resume=True
    )

    await conn.close()


asyncio.run(upload_with_resume())

SQL Server connector

The project includes an async MSSQLConnector implemented with aioodbc (ODBC). Notes:

  • System requirement: install a SQL Server ODBC driver on your host (e.g. Microsoft ODBC Driver for SQL Server).
  • Install the optional sqlserver extras to get the Python runtime dependency:
pip install -e .[sqlserver]

Quick example (using a pool):

from multids.connectors.mssql import MSSQLConnector


async def example():
    conn = MSSQLConnector()
    # either set connection options on the connector and call connect_pool(), or
    # configure a DSN and call connect_pool()
    await conn.connect_pool()
    rows = await conn.fetch_rows("SELECT id, name FROM users")
    await conn.close()

OpenSearch connector

OpenSearchConnector is an async, httpx-based helper for indexing and searching documents in OpenSearch/Elasticsearch.

Examples

from multids.connectors.opensearch import OpenSearchConnector

oc = OpenSearchConnector("http://localhost:9200", api_key="<api-key>")

# Index a single document
await oc.index_doc("my-index", {"name": "alice"})

# Bulk index (sync iterable)
docs = [{"id": 1, "name": "a"}, {"id": 2, "name": "b"}]
await oc.bulk_index("my-index", docs, chunk_size=100)

# Bulk index with id field + routing field
await oc.bulk_index("my-index", docs, chunk_size=100)

# Search / scroll
res = await oc.search("my-index", {"query": {"match_all": {}}}, size=10)
async for hit in oc.scroll("my-index", {"query": {"match_all": {}}}):
  print(hit)

await oc.close()

Advanced OpenSearch examples

Indexing with explicit IDs and routing:

docs = [
  {"id": "user-1", "name": "alice", "org": "org1"},
  {"id": "user-2", "name": "bob", "org": "org2"},
]
await oc.bulk_index("my-index", docs, chunk_size=100)

# If you want to use a field as _id and also set routing based on another field:
await oc.bulk_index("my-index", docs, chunk_size=100)

# Or use build_bulk_ndjson directly with id_field/routing_field support
async for chunk in OpenSearchConnector.build_bulk_ndjson(docs, index="my-index", id_field="id", routing_field="org"):
  await oc._request("POST", "/_bulk", content=chunk)

Authentication examples

API key (preferred for service-to-service):

oc = OpenSearchConnector("https://es.example.com", api_key="BASE64_APIKEY")
await oc.index_doc("idx", {"name": "s1"})

HTTP Basic (username/password):

oc = OpenSearchConnector("https://es.example.com", basic_auth=("user", "pass"))
await oc.search("idx", {"query": {"match_all": {}}})

Installation note

If you only need the OpenSearch functionality (and want to avoid installing httpx by default), install the optional extra:

pip install -e .[opensearch]

Integration tests

We include an optional integration test suite which can run against a real OpenSearch instance. In CI, we start a local OpenSearch service and run tests under tests/integration/. Locally you can run integration tests by setting OPENSEARCH_URL to your instance and running:

$env:OPENSEARCH_URL = 'http://localhost:9200'
pytest tests/integration -q

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

multids-0.1.0.tar.gz (18.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

multids-0.1.0-py3-none-any.whl (19.8 kB view details)

Uploaded Python 3

File details

Details for the file multids-0.1.0.tar.gz.

File metadata

  • Download URL: multids-0.1.0.tar.gz
  • Upload date:
  • Size: 18.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.14 Linux/6.11.0-1018-azure

File hashes

Hashes for multids-0.1.0.tar.gz
Algorithm Hash digest
SHA256 424a5712bcde276e2e56c53a157ffb4fdb82709666530e9ec3fd1e83beed9b8b
MD5 b6e21049ee890565895b667751aec626
BLAKE2b-256 15067a671192e5e7dde216cbae600d89308b3df7f6e5b53d0c5fa3cd37096001

See more details on using hashes here.

File details

Details for the file multids-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: multids-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 19.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.14 Linux/6.11.0-1018-azure

File hashes

Hashes for multids-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 1492e9847cb3a9425b6f540e45df9ff9d60247f00c99757240ef1c468ddbc414
MD5 f49e762f642693d563f0683762ef886e
BLAKE2b-256 4a941810f3bacfd5711e9cbf6710474a2e411696bc12b56dd9c0b28d330f4cae

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page