Skip to main content

Sync Milvus data to PostgreSQL for validation

Project description

PyMilvus PostgreSQL

pymilvus-pg is a Python library primarily designed for validating Milvus data correctness. It achieves this by synchronizing Milvus write operations (inserts, deletes, upserts) to a PostgreSQL database in real-time. By comparing the data in Milvus with the shadow data in PostgreSQL, users can verify the consistency and accuracy of their Milvus deployments. While it facilitates data synchronization, its core utility lies in providing a robust mechanism for data validation.

Features

  • Milvus Client Extension: Extends the MilvusClient functionality.
  • Data Synchronization: Keeps data in Milvus and a PostgreSQL shadow database synchronized.
  • Data Export: Allows exporting collection data from the shadow PostgreSQL instance.
  • Query Correctness Validation: Enables verification of Milvus query results by comparing them against PostgreSQL.
  • Milvus Data Correctness Validation: Enables full data comparison between Milvus and PostgreSQL.

Installation

To install pymilvus-pg, you can use pip after installing PDM or directly if the package is published:

# Ensure you have pdm installed if you are working with the source
# pip install pdm

# Install dependencies using pdm (from project root)
# pdm install

# Or install the package if available on PyPI (example)
# pip install pymilvus-pg

Usage

Here's a basic example of how to use pymilvus-pg:

from pymilvus_pg import MilvusPGClient as MilvusClient
from pymilvus.milvus_client import IndexParams
from pymilvus import DataType
import random
import time

# Initialize the client
# Replace with your Milvus URI and PostgreSQL connection string
milvus_client = MilvusClient(
    uri="http://localhost:19530",
    pg_conn_str="postgresql://user:password@localhost:5432/milvus_shadow",
)

collection_name = f"my_collection_{int(time.time())}"

# 1. Create schema
schema = milvus_client.create_schema()
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
schema.add_field("name", DataType.VARCHAR, max_length=100)
schema.add_field("age", DataType.INT64)
schema.add_field("json_field", DataType.JSON)
schema.add_field("array_field", DataType.ARRAY, element_type=DataType.INT64, max_capacity=10)
schema.add_field("embedding", DataType.FLOAT_VECTOR, dim=8)

# 2. Create collection
milvus_client.create_collection(collection_name, schema)

# 3. Create index for the vector field
index_params = IndexParams()
index_params.add_index("embedding", metric_type="L2", index_type="IVF_FLAT", params={"nlist": 128})
milvus_client.create_index(collection_name, index_params)

# 4. Load collection
milvus_client.load_collection(collection_name)

# 5. Insert data
data_to_insert = [
    {
        "id": i,
        "name": f"item_{i}",
        "age": 20 + i,
        "json_field": {"category": f"cat_{i%3}", "value": i * 10},
        "array_field": [i, i + 1, i + 2],
        "embedding": [random.random() for _ in range(8)]
    } for i in range(10)
]
milvus_client.insert(collection_name, data_to_insert)
print(f"Inserted {len(data_to_insert)} entities.")

# 6. Query data (from Milvus, synchronized to PostgreSQL)
# Wait a bit for synchronization if operations are very fast
time.sleep(1) 
query_res = milvus_client.query(collection_name, filter_expression="age > 25")
print("Query results (age > 25):")
for entity in query_res:
    print(entity)

# 7. Delete data
ids_to_delete = [0, 1, 2]
milvus_client.delete(collection_name, ids=ids_to_delete)
print(f"Deleted entities with IDs: {ids_to_delete}")

# 8. Upsert data
data_to_upsert = [
    {
        "id": i,
        "name": f"updated_item_{i}",
        "age": 30 + i,
        "json_field": {"category": f"cat_updated_{i%3}", "value": i * 100},
        "array_field": [i*2, i*2 + 1, i*2 + 2],
        "embedding": [random.random() for _ in range(8)]
    } for i in range(3, 7) # Upserting IDs 3,4,5,6 (some new, some existing)
]
milvus_client.upsert(collection_name, data_to_upsert)
print(f"Upserted {len(data_to_upsert)} entities.")

# 9. Export data (from PostgreSQL)
# Wait for sync
time.sleep(1)
exported_data = milvus_client.export(collection_name)
print(f"Exported data from PostgreSQL for collection '{collection_name}':")
for row in exported_data:
    print(row)

# Clean up (optional)
# milvus_client.drop_collection(collection_name)

print("Demo finished.")

License

This project is licensed under the MIT License. See the LICENSE file for details (if one exists, otherwise specified in pyproject.toml).

Contributing

Contributions are welcome! Please feel free to submit a pull request or open an issue.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pymilvus_pg-0.1.3.tar.gz (18.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pymilvus_pg-0.1.3-py3-none-any.whl (18.3 kB view details)

Uploaded Python 3

File details

Details for the file pymilvus_pg-0.1.3.tar.gz.

File metadata

  • Download URL: pymilvus_pg-0.1.3.tar.gz
  • Upload date:
  • Size: 18.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.22.3 CPython/3.13.4 Darwin/22.6.0

File hashes

Hashes for pymilvus_pg-0.1.3.tar.gz
Algorithm Hash digest
SHA256 ffa8cf81029175d8ae3f89f1ea1b6f75d9be56ae19c6ddc4b5196921d4992b7f
MD5 ab4c550306711700417eca51850a5c9e
BLAKE2b-256 d8a3d97699c34482bafb6b475f7080995c31fddb76640793ad7bc5be47bc1285

See more details on using hashes here.

File details

Details for the file pymilvus_pg-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: pymilvus_pg-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 18.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.22.3 CPython/3.13.4 Darwin/22.6.0

File hashes

Hashes for pymilvus_pg-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 7a676137e7a55f83bc174f598a76108d1eacc9b732d36e212048b0196a09dff0
MD5 6b2766f72c4bff2c2289f3b6ea0086e5
BLAKE2b-256 001ef5c20e0ed8e6a44c743ce0883a4d90d50e15beb64f1b2caec4a40793fb5a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page