Skip to main content

Sync Milvus data to PostgreSQL for validation

Project description

PyMilvus PostgreSQL

pymilvus-pg is a Python library primarily designed for validating Milvus data correctness. It achieves this by synchronizing Milvus write operations (inserts, deletes, upserts) to a PostgreSQL database in real-time. By comparing the data in Milvus with the shadow data in PostgreSQL, users can verify the consistency and accuracy of their Milvus deployments. While it facilitates data synchronization, its core utility lies in providing a robust mechanism for data validation.

Features

  • Milvus Client Extension: Extends the MilvusClient functionality.
  • Data Synchronization: Keeps data in Milvus and a PostgreSQL shadow database synchronized.
  • Data Export: Allows exporting collection data from the shadow PostgreSQL instance.
  • Query Correctness Validation: Enables verification of Milvus query results by comparing them against PostgreSQL.
  • Milvus Data Correctness Validation: Enables full data comparison between Milvus and PostgreSQL.

Installation

To install pymilvus-pg, you can use pip after installing PDM or directly if the package is published:

# Ensure you have pdm installed if you are working with the source
# pip install pdm

# Install dependencies using pdm (from project root)
# pdm install

# Or install the package if available on PyPI (example)
# pip install pymilvus-pg

Usage

Here's a basic example of how to use pymilvus-pg:

from pymilvus_pg import MilvusPGClient as MilvusClient
from pymilvus.milvus_client import IndexParams
from pymilvus import DataType
import random
import time

# Initialize the client
# Replace with your Milvus URI and PostgreSQL connection string
milvus_client = MilvusClient(
    uri="http://localhost:19530",
    pg_conn_str="postgresql://user:password@localhost:5432/milvus_shadow",
)

collection_name = f"my_collection_{int(time.time())}"

# 1. Create schema
schema = milvus_client.create_schema()
schema.add_field("id", DataType.INT64, is_primary=True, auto_id=False)
schema.add_field("name", DataType.VARCHAR, max_length=100)
schema.add_field("age", DataType.INT64)
schema.add_field("json_field", DataType.JSON)
schema.add_field("array_field", DataType.ARRAY, element_type=DataType.INT64, max_capacity=10)
schema.add_field("embedding", DataType.FLOAT_VECTOR, dim=8)

# 2. Create collection
milvus_client.create_collection(collection_name, schema)

# 3. Create index for the vector field
index_params = IndexParams()
index_params.add_index("embedding", metric_type="L2", index_type="IVF_FLAT", params={"nlist": 128})
milvus_client.create_index(collection_name, index_params)

# 4. Load collection
milvus_client.load_collection(collection_name)

# 5. Insert data
data_to_insert = [
    {
        "id": i,
        "name": f"item_{i}",
        "age": 20 + i,
        "json_field": {"category": f"cat_{i%3}", "value": i * 10},
        "array_field": [i, i + 1, i + 2],
        "embedding": [random.random() for _ in range(8)]
    } for i in range(10)
]
milvus_client.insert(collection_name, data_to_insert)
print(f"Inserted {len(data_to_insert)} entities.")

# 6. Query data (from Milvus, synchronized to PostgreSQL)
# Wait a bit for synchronization if operations are very fast
time.sleep(1) 
query_res = milvus_client.query(collection_name, filter_expression="age > 25")
print("Query results (age > 25):")
for entity in query_res:
    print(entity)

# 7. Delete data
ids_to_delete = [0, 1, 2]
milvus_client.delete(collection_name, ids=ids_to_delete)
print(f"Deleted entities with IDs: {ids_to_delete}")

# 8. Upsert data
data_to_upsert = [
    {
        "id": i,
        "name": f"updated_item_{i}",
        "age": 30 + i,
        "json_field": {"category": f"cat_updated_{i%3}", "value": i * 100},
        "array_field": [i*2, i*2 + 1, i*2 + 2],
        "embedding": [random.random() for _ in range(8)]
    } for i in range(3, 7) # Upserting IDs 3,4,5,6 (some new, some existing)
]
milvus_client.upsert(collection_name, data_to_upsert)
print(f"Upserted {len(data_to_upsert)} entities.")

# 9. Export data (from PostgreSQL)
# Wait for sync
time.sleep(1)
exported_data = milvus_client.export(collection_name)
print(f"Exported data from PostgreSQL for collection '{collection_name}':")
for row in exported_data:
    print(row)

# Clean up (optional)
# milvus_client.drop_collection(collection_name)

print("Demo finished.")

License

This project is licensed under the MIT License. See the LICENSE file for details (if one exists, otherwise specified in pyproject.toml).

Contributing

Contributions are welcome! Please feel free to submit a pull request or open an issue.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pymilvus_pg-0.1.3rc2.tar.gz (18.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pymilvus_pg-0.1.3rc2-py3-none-any.whl (18.3 kB view details)

Uploaded Python 3

File details

Details for the file pymilvus_pg-0.1.3rc2.tar.gz.

File metadata

  • Download URL: pymilvus_pg-0.1.3rc2.tar.gz
  • Upload date:
  • Size: 18.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.22.3 CPython/3.13.4 Darwin/22.6.0

File hashes

Hashes for pymilvus_pg-0.1.3rc2.tar.gz
Algorithm Hash digest
SHA256 39ce999c5819ae2149550705028280d06d9ae763a8024047ba58951eefd922df
MD5 ebf68ded99ff83cc2dc3676025b49de3
BLAKE2b-256 1fadb6c57bdcc8436e2a3dd755fe15eb57b697dee798a186f64070b9b315fda5

See more details on using hashes here.

File details

Details for the file pymilvus_pg-0.1.3rc2-py3-none-any.whl.

File metadata

  • Download URL: pymilvus_pg-0.1.3rc2-py3-none-any.whl
  • Upload date:
  • Size: 18.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.22.3 CPython/3.13.4 Darwin/22.6.0

File hashes

Hashes for pymilvus_pg-0.1.3rc2-py3-none-any.whl
Algorithm Hash digest
SHA256 a6b7885fb18d137f184559c2b2d564a9b6d2d264b04ec421b455b0966e8b34fa
MD5 213506b1ea8bfd439271e72ecfde447e
BLAKE2b-256 a64dd8a9b65810ea1042a58e959cf4424a29f64dd8b3d553428553df60ef5686

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page