Async multi-data-source connectors (S3, OpenSearch, Athena, MySQL, SQL Server, local files) with AI hooks.
Project description
multids
Async multi-data-source connectors for S3, OpenSearch, Athena, MySQL, SQL Server and local files. Provides async read/write primitives and pluggable AI hooks.
Quick start
- Create a virtualenv and install:
python -m venv .venv
.\.venv\Scripts\Activate.ps1
pip install multids[ai]
- See
examples/basic_usage.pyfor a quick demonstration.
JSON Support
LocalConnector and S3Connector include helper methods for reading and writing JSON with automatic Unicode support (
no escaping of characters like 你好).
Local JSON
from multids.connectors.local import LocalConnector
local = LocalConnector()
data = {"message": "Hello", "lang": "你好"}
# Write JSON (un-escaped unicode)
await local.write_json(data, "path/to/data.json")
# Read JSON
result = await local.read_json("path/to/data.json")
print(result)
S3 JSON
from multids.connectors.s3 import S3Connector
s3 = S3Connector(aws_region="eu-central-1")
data = {"status": "ok", "info": "مرحبا"}
# Upload JSON object
await s3.write_json(data, bucket="my-bucket", key="status.json")
# Download and parse JSON
result = await s3.read_json(bucket="my-bucket", key="status.json")
print(result)
S3 multipart example
python examples/s3_multipart_example.py
MySQL bulk insert example
python examples/mysql_bulk_example.py
S3 upload behavior and options
min_multipart_upload_size(default: 5 MiB): overall object size threshold before the connector switches from a singlePUTto a multipart upload. The connector buffers the incoming stream and only creates a multipart upload when the accumulated bytes reach this threshold. This is useful when you're uploading many small JSON files — they will use a singleput_objectunless you exceed the threshold.part_size(default: 8 MiB): size of each multipart part once multipart is created.enforce_min_part_size(default: False in this library): when enabled, the connector will enforce the AWS minimum part size of 5 MiB forpart_size(prevents creating parts smaller than AWS requires). For typical workloads of many small files, the defaultFalsemakes the connector flexible.force_multipart(write-time flag, defaultFalse): passforce_multipart=TruetoS3Connector.write_stream(...)to force multipart upload even if the total size is belowmin_multipart_upload_size. Use this when you need multipart semantics (e.g., resumable uploads across process restarts) for small objects.
Examples
- Force multipart for a small object (useful for resuming):
await s3.write_stream(my_small_stream(), bucket="b", key="k", force_multipart=True)
- Normal (default) path: small objects use a single
put_objectand large objects use multipart as needed.
Resumable uploads & checkpoints
The S3 connector supports simple checkpointing to make multipart uploads resumable across process restarts. When you
provide a checkpoint_path to write_stream, the connector writes a small JSON file containing the upload metadata as
parts complete. If a failure or process stop occurs you can call write_stream(..., resume=True, checkpoint_path=...)
to attempt a resume.
Checkpoint format (JSON):
{
"bucket": "my-bucket",
"key": "path/to/object",
"upload_id": "<aws-upload-id>",
"parts": [
{
"PartNumber": 1,
"ETag": "...",
"size": 5242880
},
{
"PartNumber": 2,
"ETag": "...",
"size": 5242880
}
]
}
How it works:
- While uploading parts, the connector will save the checkpoint file (if
checkpoint_pathis provided) after each part completes. The checkpoint includes theupload_idand the parts uploaded so far. - If an upload is interrupted, re-running
write_streamwithresume=Trueand the samecheckpoint_pathwill attempt to list existing parts from S3 using theupload_idand continue uploading remaining parts. If listing parts fails, the connector will fall back to the parts recorded in the checkpoint file. - On successful completion the checkpoint file is removed.
Resume example (simplified):
async def upload_with_resume():
conn = S3Connector(part_size=5 * 1024 * 1024)
chk = "/tmp/uploads/my-object.chk"
async def small_stream():
# imagine this yields many small chunks and we want resumable semantics
for _ in range(100):
yield b"{" + b' ' * 1024 + b"}\n"
# First attempt: force multipart so we have a resumable upload
await conn.write_stream(
small_stream(),
bucket="my-bucket",
key="my-object.json",
checkpoint_path=chk,
force_multipart=True,
)
# If the process were interrupted before completion, re-run and resume:
await conn.write_stream(
small_stream(), bucket="my-bucket", key="my-object.json", checkpoint_path=chk, resume=True
)
await conn.close()
asyncio.run(upload_with_resume())
SQL Server connector
The project includes an async MSSQLConnector implemented with aioodbc (ODBC). Notes:
- System requirement: install a SQL Server ODBC driver on your host (e.g. Microsoft ODBC Driver for SQL Server).
- Install the optional
sqlserverextras to get the Python runtime dependency:
pip install multids[sqlserver]
Quick example (using a pool):
from multids.connectors.mssql import MSSQLConnector
async def example():
conn = MSSQLConnector()
# either set connection options on the connector and call connect_pool(), or
# configure a DSN and call connect_pool()
await conn.connect_pool()
rows = await conn.fetch_rows("SELECT id, name FROM users")
await conn.close()
OpenSearch connector
OpenSearchConnector is an async, httpx-based helper for indexing and searching documents in OpenSearch/Elasticsearch.
Examples
from multids.connectors.opensearch import OpenSearchConnector
oc = OpenSearchConnector("http://localhost:9200", api_key="<api-key>")
# Index a single document
await oc.index_doc("my-index", {"name": "alice"})
# Bulk index (sync iterable)
docs = [{"id": 1, "name": "a"}, {"id": 2, "name": "b"}]
await oc.bulk_index("my-index", docs, chunk_size=100)
# Bulk index with id field + routing field
await oc.bulk_index("my-index", docs, chunk_size=100)
# Search / scroll
res = await oc.search("my-index", {"query": {"match_all": {}}}, size=10)
async for hit in oc.scroll("my-index", {"query": {"match_all": {}}}):
print(hit)
await oc.close()
Advanced OpenSearch examples
Indexing with explicit IDs and routing:
docs = [
{"id": "user-1", "name": "alice", "org": "org1"},
{"id": "user-2", "name": "bob", "org": "org2"},
]
await oc.bulk_index("my-index", docs, chunk_size=100)
# If you want to use a field as _id and also set routing based on another field:
await oc.bulk_index("my-index", docs, chunk_size=100)
# Or use build_bulk_ndjson directly with id_field/routing_field support
async for chunk in OpenSearchConnector.build_bulk_ndjson(docs, index="my-index", id_field="id", routing_field="org"):
await oc._request("POST", "/_bulk", content=chunk)
Authentication examples
API key (preferred for service-to-service):
oc = OpenSearchConnector("https://es.example.com", api_key="BASE64_APIKEY")
await oc.index_doc("idx", {"name": "s1"})
HTTP Basic (username/password):
oc = OpenSearchConnector("https://es.example.com", basic_auth=("user", "pass"))
await oc.search("idx", {"query": {"match_all": {}}})
Installation note
If you only need the OpenSearch functionality (and want to avoid installing httpx by default), install the optional
extra:
pip install multids[opensearch]
Integration tests
We include an optional integration test suite which can run against a real OpenSearch instance. In CI, we start a local
OpenSearch service and run tests under tests/integration/. Locally you can run integration tests by setting
OPENSEARCH_URL to your instance and running:
$env:OPENSEARCH_URL = 'http://localhost:9200'
pytest tests/integration -q
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file multids-0.1.1.tar.gz.
File metadata
- Download URL: multids-0.1.1.tar.gz
- Upload date:
- Size: 24.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.11.14 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
28a39e684220e2b0e8cbd31c01c65623fba9a717c89cee025e034484fbdf5e70
|
|
| MD5 |
1dc3b07a6d3a6e628e27ce23d20794a1
|
|
| BLAKE2b-256 |
f669ae393c3777304f5667e6b588439ca8ea3260dd604a668422b0833c154ce1
|
File details
Details for the file multids-0.1.1-py3-none-any.whl.
File metadata
- Download URL: multids-0.1.1-py3-none-any.whl
- Upload date:
- Size: 28.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.2.1 CPython/3.11.14 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fb9b3620238cb47a1dd8b4e13009699a091008e5d4969af178d7cf8fe27bcd2e
|
|
| MD5 |
9843fa9e97b5a9c5885bee72932a2cca
|
|
| BLAKE2b-256 |
e9ebf1e14ff36f1f67907b7708625ec25703b19fac94f1c3ecb388068c494ce3
|