Skip to main content

Sync Milvus data to PostgreSQL for validation

Project description

PyMilvus PostgreSQL

pymilvus-pg is a Python library primarily designed for validating Milvus data correctness. It achieves this by synchronizing Milvus write operations (inserts, deletes, upserts) to a PostgreSQL database in real-time. By comparing the data in Milvus with the shadow data in PostgreSQL, users can verify the consistency and accuracy of their Milvus deployments. While it facilitates data synchronization, its core utility lies in providing a robust mechanism for data validation.

Features

  • Milvus Client Extension: Extends the MilvusClient functionality.
  • Data Synchronization: Keeps data in Milvus and a PostgreSQL shadow database synchronized.
  • Data Export: Allows exporting collection data from the shadow PostgreSQL instance.
  • Query Correctness Validation: Enables verification of Milvus query results by comparing them against PostgreSQL.
  • Milvus Data Correctness Validation: Enables full data comparison between Milvus and PostgreSQL.
  • Three-Way Validation with LMDB: Built-in LMDB integration (enabled by default) provides automatic error source identification when inconsistencies are detected.

Installation

To install pymilvus-pg, you can use pip after installing PDM or directly if the package is published:

# Ensure you have pdm installed if you are working with the source
# pip install pdm

# Install dependencies using pdm (from project root)
# pdm install

# Or install the package if available on PyPI (example)
# pip install pymilvus-pg

Usage

Here's a basic example of how to use pymilvus-pg:

from pymilvus_pg import MilvusPGClient as MilvusClient
from pymilvus.milvus_client import IndexParams
from pymilvus import DataType
import random
import time

# Initialize the client
# Replace with your Milvus URI and PostgreSQL connection string
milvus_client = MilvusClient(
    uri="http://localhost:19530",
    pg_conn_str="postgresql://user:password@localhost:5432/milvus_shadow",
)

collection_name = f"my_collection_{int(time.time())}"

# 1. Create schema
schema = milvus_client.create_schema()
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
schema.add_field("name", DataType.VARCHAR, max_length=100)
schema.add_field("age", DataType.INT64)
schema.add_field("json_field", DataType.JSON)
schema.add_field("array_field", DataType.ARRAY, element_type=DataType.INT64, max_capacity=10)
schema.add_field("embedding", DataType.FLOAT_VECTOR, dim=8)

# 2. Create collection
milvus_client.create_collection(collection_name, schema)

# 3. Create index for the vector field
index_params = IndexParams()
index_params.add_index("embedding", metric_type="L2", index_type="IVF_FLAT", params={"nlist": 128})
milvus_client.create_index(collection_name, index_params)

# 4. Load collection
milvus_client.load_collection(collection_name)

# 5. Insert data
data_to_insert = [
    {
        "id": i,
        "name": f"item_{i}",
        "age": 20 + i,
        "json_field": {"category": f"cat_{i%3}", "value": i * 10},
        "array_field": [i, i + 1, i + 2],
        "embedding": [random.random() for _ in range(8)]
    } for i in range(10)
]
milvus_client.insert(collection_name, data_to_insert)
print(f"Inserted {len(data_to_insert)} entities.")

# 6. Query data (from Milvus, synchronized to PostgreSQL)
# Wait a bit for synchronization if operations are very fast
time.sleep(1) 
query_res = milvus_client.query(collection_name, filter_expression="age > 25")
print("Query results (age > 25):")
for entity in query_res:
    print(entity)

# 7. Delete data
ids_to_delete = [0, 1, 2]
milvus_client.delete(collection_name, ids=ids_to_delete)
print(f"Deleted entities with IDs: {ids_to_delete}")

# 8. Upsert data
data_to_upsert = [
    {
        "id": i,
        "name": f"updated_item_{i}",
        "age": 30 + i,
        "json_field": {"category": f"cat_updated_{i%3}", "value": i * 100},
        "array_field": [i*2, i*2 + 1, i*2 + 2],
        "embedding": [random.random() for _ in range(8)]
    } for i in range(3, 7) # Upserting IDs 3,4,5,6 (some new, some existing)
]
milvus_client.upsert(collection_name, data_to_upsert)
print(f"Upserted {len(data_to_upsert)} entities.")

# 9. Export data (from PostgreSQL)
# Wait for sync
time.sleep(1)
exported_data = milvus_client.export(collection_name)
print(f"Exported data from PostgreSQL for collection '{collection_name}':")
for row in exported_data:
    print(row)

# Clean up (optional)
# milvus_client.drop_collection(collection_name)

print("Demo finished.")

License

This project is licensed under the MIT License. See the LICENSE file for details (if one exists, otherwise specified in pyproject.toml).

Contributing

Contributions are welcome! Please feel free to submit a pull request or open an issue.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pymilvus_pg-0.1.5rc1.tar.gz (34.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pymilvus_pg-0.1.5rc1-py3-none-any.whl (35.6 kB view details)

Uploaded Python 3

File details

Details for the file pymilvus_pg-0.1.5rc1.tar.gz.

File metadata

  • Download URL: pymilvus_pg-0.1.5rc1.tar.gz
  • Upload date:
  • Size: 34.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.22.3 CPython/3.13.4 Darwin/22.6.0

File hashes

Hashes for pymilvus_pg-0.1.5rc1.tar.gz
Algorithm Hash digest
SHA256 a763feadf2104ac5862402ecb84c0c13f6d4bb7a28cd67f4c63fd52dd7dd5d82
MD5 f952ae59a44614e13bdf5d2e312983d1
BLAKE2b-256 d34438dc8553212059254bb9ef8d28f2a7458ca1086e89f9fe8ed7b1b4c86c71

See more details on using hashes here.

File details

Details for the file pymilvus_pg-0.1.5rc1-py3-none-any.whl.

File metadata

  • Download URL: pymilvus_pg-0.1.5rc1-py3-none-any.whl
  • Upload date:
  • Size: 35.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.22.3 CPython/3.13.4 Darwin/22.6.0

File hashes

Hashes for pymilvus_pg-0.1.5rc1-py3-none-any.whl
Algorithm Hash digest
SHA256 835eaa9ca73a444129b0677852a998ca1d089794f16fcfebd28def3ab453eca4
MD5 e491474a3c2b57ea29f9e47a689b2a3f
BLAKE2b-256 53973fb6cf08ee4c8a00b8fa9530078f2c3e86581d370c1c00d410fa7da2939d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page