VectorWave: Seamless Auto-Vectorization Framework
Project description
VectorWave: Seamless Auto-Vectorization Framework
🌟 Overview
VectorWave is an innovative framework that uses decorators to automatically save and manage the output of Python functions/methods in a Vector Database (Vector DB). Developers can convert function outputs into intelligent vector data with just a single line of code (@vectorize), without worrying about the complex processes of data collection, embedding generation, and Vector DB storage.
✨ Features
@vectorizeDecorator:- Static Data Collection: Upon script load, the function's source code, docstring, and metadata are saved once to the
VectorWaveFunctionscollection. - Dynamic Data Logging: Each time the function is called, its execution time, success/failure status, error logs, and "dynamic tags" are recorded in the
VectorWaveExecutionscollection.
- Static Data Collection: Upon script load, the function's source code, docstring, and metadata are saved once to the
- Distributed Tracing: Combines
@vectorizeand@trace_spandecorators to bundle the execution of complex, multi-step workflows under a singletrace_idfor analysis. - Search Interface: Provides
search_functionsandsearch_executionsto query the stored vector data (function definitions) and logs (execution history), facilitating the construction of RAG and monitoring systems.
🚀 Usage
VectorWave consists of "storage" via decorators and "retrieval" via functions, and now includes execution flow tracing.
1. (Required) Database Initialization and Setup
import time
from vectorwave import (
vectorize,
initialize_database,
search_functions,
search_executions
)
# [New] Import trace_span separately for distributed tracing.
from vectorwave.monitoring.tracer import trace_span
# Needs to be called only once at script startup.
try:
client = initialize_database()
print("VectorWave DB initialization successful.")
except Exception as e:
print(f"DB initialization failed: {e}")
exit()
2. [Storage] Using @vectorize and Distributed Tracing
@vectorize acts as the Root of the trace, and applying @trace_span to internal functions bundles the workflow execution under a single trace_id.
# --- Child Span Function: Captures arguments ---
@trace_span(attributes_to_capture=['user_id', 'amount'])
def step_1_validate_payment(user_id: str, amount: int):
"""(Span) Validates payment. Logs user_id and amount."""
print(f" [SPAN 1] Validating payment for {user_id}...")
time.sleep(0.1)
return True
@trace_span(attributes_to_capture=['user_id', 'receipt_id'])
def step_2_send_receipt(user_id: str, receipt_id: str):
"""(Span) Sends receipt."""
print(f" [SPAN 2] Sending receipt {receipt_id}...")
time.sleep(0.2)
# --- Root Function (acts as @trace_root) ---
@vectorize(
search_description="Processes a user payment and returns a receipt.",
sequence_narrative="After payment is complete, a receipt is sent via email.",
team="billing", # ⬅️ Custom tag (logged on all executions)
priority=1 # ⬅️ Custom tag (execution importance)
)
def process_payment(user_id: str, amount: int):
"""(Root Span) Executes the user payment workflow."""
print(f" [ROOT EXEC] process_payment: Starting workflow for {user_id}...")
# When child functions are called, the same trace_id is automatically inherited via ContextVar.
step_1_validate_payment(user_id=user_id, amount=amount)
receipt_id = f"receipt_{user_id}_{amount}"
step_2_send_receipt(user_id=user_id, receipt_id=receipt_id)
print(f" [ROOT DONE] process_payment")
return {"status": "success", "receipt_id": receipt_id}
# --- Function Execution ---
print("Now calling 'process_payment'...")
# This single call will record a total of 3 execution logs (spans) in the DB,
# and all three logs will be tied to a single 'trace_id'.
process_payment("user_789", 5000)
3. [Retrieval ①] Search Function Definitions (for RAG)
# Search for functions related to 'payment' using natural language (vector).
print("\n--- Searching for 'payment' related functions ---")
payment_funcs = search_functions(
query="User payment processing feature",
limit=3
)
for func in payment_funcs:
print(f" - Function: {func['properties']['function_name']}")
print(f" - Description: {func['properties']['search_description']}")
print(f" - Similarity (Distance): {func['metadata'].distance:.4f}")
4. [Retrieval ②] Search Execution Logs (for Monitoring & Tracing)
search_executions can now retrieve all related execution logs (spans) based on a trace_id.
# 1. Find the Trace ID of a specific workflow (process_payment).
latest_payment_span = search_executions(
limit=1,
filters={"function_name": "process_payment"},
sort_by="timestamp_utc",
sort_ascending=False
)
trace_id = latest_payment_span[0]["trace_id"]
# 2. Retrieve all spans belonging to that Trace ID in chronological order.
print(f"\n--- Full Trace for ID ({trace_id[:8]}...) ---")
trace_spans = search_executions(
limit=10,
filters={"trace_id": trace_id},
sort_by="timestamp_utc",
sort_ascending=True # Sort ascending to analyze workflow
)
for i, span in enumerate(trace_spans):
print(f" - [Span {i+1}] {span['function_name']} ({span['duration_ms']:.2f}ms)")
# Captured arguments (user_id, amount, etc.) from child spans will also be visible.
# Expected Output:
# - [Span 1] step_1_validate_payment (100.81ms)
# - [Span 2] step_2_send_receipt (202.06ms)
# - [Span 3] process_payment (333.18ms)
⚙️ Configuration
VectorWave automatically reads Weaviate database connection info and vectorization strategy from environment variables or a .env file.
Create a .env file in your project's root directory (e.g., where test_ex/example.py is located) and set the required values.
Vectorizer Strategy (VECTORIZER)
You can select the text vectorization method via the VECTORIZER environment variable in your test_ex/.env file.
VECTORIZER Setting |
Description | Required Additional Settings |
|---|---|---|
huggingface |
(Default Recommended) Uses the sentence-transformers library to vectorize on your local CPU. No API key is needed. |
HF_MODEL_NAME (e.g., "sentence-transformers/all-MiniLM-L6-v2") |
openai_client |
(High-Performance) Uses the OpenAI Python client to vectorize with models like text-embedding-3-small. |
OPENAI_API_KEY (A valid OpenAI API key) |
weaviate_module |
(Docker Delegate) Delegates vectorization to Weaviate's built-in module (e.g., text2vec-openai). |
WEAVIATE_VECTORIZER_MODULE, OPENAI_API_KEY |
none |
Disables vectorization. Data is stored without vectors. | None |
.env File Examples
Configure your .env file according to the strategy you want to use.
Example 1: Using huggingface (Local, No API Key)
Uses a sentence-transformers model on your local machine. Ideal for testing without API keys.
# .env (Using HuggingFace)
# --- Basic Weaviate Connection ---
WEAVIATE_HOST=localhost
WEAVIATE_PORT=8080
WEAVIATE_GRPC_PORT=50051
# --- [Strategy 1] HuggingFace Config ---
VECTORIZER="huggingface"
HF_MODEL_NAME="sentence-transformers/all-MiniLM-L6-v2"
# (OPENAI_API_KEY is not required for this mode)
OPENAI_API_KEY=sk-...
# --- [Advanced] Custom Properties ---
CUSTOM_PROPERTIES_FILE_PATH=.weaviate_properties
FAILURE_MAPPING_FILE_PATH=.vectorwave_errors.json
RUN_ID=test-run-001
Example 2: Using openai_client (Python Client, High-Performance)
Directly calls the OpenAI API via the openai Python library.
# .env (Using OpenAI Python Client)
# --- Basic Weaviate Connection ---
WEAVIATE_HOST=localhost
WEAVIATE_PORT=8080
WEAVIATE_GRPC_PORT=50051
# --- [Strategy 2] OpenAI Client Config ---
VECTORIZER="openai_client"
# [Required] You must enter a valid OpenAI API key.
OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx
# (HF_MODEL_NAME is not used in this mode)
HF_MODEL_NAME=...
# --- [Advanced] Custom Properties ---
CUSTOM_PROPERTIES_FILE_PATH=.weaviate_properties
FAILURE_MAPPING_FILE_PATH=.vectorwave_errors.json
RUN_ID=test-run-001
Example 3: Using weaviate_module (Docker Delegate)
Delegates vectorization to the Weaviate Docker container instead of Python. (See vw_docker.yml config).
# .env (Delegating to Weaviate Module)
# --- Basic Weaviate Connection ---
WEAVIATE_HOST=localhost
WEAVIATE_PORT=8080
WEAVIATE_GRPC_PORT=50051
# --- [Strategy 3] Weaviate Module Config ---
VECTORIZER="weaviate_module"
WEAVIATE_VECTORIZER_MODULE=text2vec-openai
WEAVIATE_GENERATIVE_MODULE=generative-openai
# [Required] The Weaviate container will read this API key.
OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxx
# --- [Advanced] Custom Properties ---
CUSTOM_PROPERTIES_FILE_PATH=.weaviate_properties
FAILURE_MAPPING_FILE_PATH=.vectorwave_errors.json
RUN_ID=test-run-001
🚀 Advanced Failure Tracing (Error Code)
This enhances VectorWaveExecutions logs beyond a simple status: "ERROR". An error_code property is added to the schema for granular failure analysis.
When a function wrapped by @vectorize or @trace_span fails, the error_code is automatically determined based on three priorities:
-
Custom Exception Attribute (Priority 1): The most specific method. If the raised exception object
ehas ane.error_codeattribute, its value is used.class PaymentError(Exception): def __init__(self, message, error_code): super().__init__(message) self.error_code = error_code # ⬅️ This attribute is detected. @vectorize(...) def process_payment(amount): if amount < 0: raise PaymentError("Amount < 0", error_code="PAYMENT_NEGATIVE_AMOUNT") # DB Log on execution: { "status": "ERROR", "error_code": "PAYMENT_NEGATIVE_AMOUNT" }
-
Global Mapping File (Priority 2): Centrally manage common exceptions. VectorWave loads a JSON file specified by
FAILURE_MAPPING_FILE_PATHin your.env(default:.vectorwave_errors.json) and maps the exception class name to a code..vectorwave_errors.jsonExample:{ "ValueError": "INVALID_INPUT", "KeyError": "CONFIG_MISSING", "TypeError": "INVALID_INPUT" }
@vectorize(...) def get_config(key): return os.environ[key] # ⬅️ Raises KeyError # DB Log on execution: { "status": "ERROR", "error_code": "CONFIG_MISSING" }
-
Default (Priority 3): If neither of the above applies, the exception's class name (e.g.,
"ZeroDivisionError") is stored as the defaulterror_code.
[Usage] Searching for Failures:
You can now filter for specific failure types using search_executions.
# Find all failure logs categorized as "INVALID_INPUT"
invalid_logs = search_executions(
filters={"error_code": "INVALID_INPUT"},
limit=10
)
Custom Properties and Dynamic Execution Tagging
VectorWave can store user-defined metadata in addition to static data (function definitions) and dynamic data (execution logs). This works in two steps.
Step 1: Define Custom Schema (Tag "Allow-list")
Create a JSON file at the path specified by CUSTOM_PROPERTIES_FILE_PATH in your .env file (default: .weaviate_properties).
This file instructs VectorWave to add new properties (columns) to the Weaviate collections. This file acts as an "allow-list" for all custom tags.
.weaviate_properties Example:
{
"run_id": {
"data_type": "TEXT",
"description": "The ID of the specific test run"
},
"experiment_id": {
"data_type": "TEXT",
"description": "Identifier for the experiment"
},
"team": {
"data_type": "TEXT",
"description": "The team responsible for this function"
},
"priority": {
"data_type": "INT",
"description": "Execution priority level"
}
}
- This definition will add
run_id,experiment_id,team, andpriorityproperties to both theVectorWaveFunctionsandVectorWaveExecutionscollections.
Step 2: Dynamic Execution Tagging (Adding Values)
When a function is executed, VectorWave adds tags to the VectorWaveExecutions log. These tags are collected and merged from two sources.
1. Global Tags (Environment Variables)
VectorWave looks for environment variables matching the UPPERCASE name of the keys defined in Step 1 (e.g., RUN_ID, EXPERIMENT_ID). Found values are loaded as global_custom_values and added to all execution logs. Ideal for run-wide metadata.
2. Function-Specific Tags (Decorator)
You can pass tags as keyword arguments (**execution_tags) directly to the @vectorize decorator. This is ideal for function-specific metadata.
# --- .env file ---
# RUN_ID=global-run-abc
# TEAM=default-team
@vectorize(
search_description="Process payment",
sequence_narrative="...",
team="billing", # <-- Function-specific tag
priority=1 # <-- Function-specific tag
)
def process_payment():
pass
@vectorize(
search_description="Another function",
sequence_narrative="...",
run_id="override-run-xyz" # <-- Overrides the global tag
)
def other_function():
pass
-
Validation (Important): Tags (global or function-specific) will only be saved to Weaviate if their key (e.g.,
run_id,team,priority) was first defined in the.weaviate_propertiesfile (Step 1). Tags not defined in the schema are ignored, and a warning is logged at startup. -
Priority (Override): If a tag key is defined in both places (e.g., global
RUN_IDin.envandrun_id="override-xyz"in the decorator), the function-specific tag from the decorator always wins.
Resulting Logs:
process_payment()execution log:{"run_id": "global-run-abc", "team": "billing", "priority": 1}other_function()execution log:{"run_id": "override-run-xyz", "team": "default-team"}
🚀 Real-time Error Alerting (Webhook)
Beyond just logging, VectorWave can send real-time notifications via webhook the instant an error occurs. This functionality is built directly into the tracer and can be activated simply by updating your .env file.
How it Works:
- An exception is raised within a function decorated by
@trace_spanor@vectorize. - The tracer catches the exception in its
exceptblock and immediately calls thealerterobject. - The alerter reads the
.envconfiguration and uses theWebhookAlerterto dispatch the error details to your specified URL. - The notification is optimized for Discord Embeds, sending a rich report including the error code, trace ID, captured attributes (
user_id, etc.), and the full stack trace.
How to Enable:
Add the following two variables to your test_ex/.env file (or environment variables):
# .env file
# 1. Set the alerter strategy to 'webhook'. (Default: "none")
ALERTER_STRATEGY="webhook"
# 2. Provide your webhook URL from Discord, Slack, etc.
ALERTER_WEBHOOK_URL="[https://discord.com/api/webhooks/YOUR_HOOK_ID/](https://discord.com/api/webhooks/YOUR_HOOK_ID/)..."
With just these two lines, running test_ex/example.py will now instantly send a Discord alert when the CustomValueError is raised.
Extensibility (Strategy Pattern): The alerting system is built on a Strategy Pattern. You can easily extend it by implementing the BaseAlerter interface to support other channels like email, PagerDuty, or more.
**Tag Merging and Validation Rules**
🤝 Contributing
Bug reports, feature requests, and code contributions are all welcome. For details, please see CONTRIBUTING.md.
📜 License
This project is distributed under the MIT License. See the LICENSE file for details.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file vectorwave-0.1.6.tar.gz.
File metadata
- Download URL: vectorwave-0.1.6.tar.gz
- Upload date:
- Size: 47.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5e611dfc34756cba94f806b1b72cbf40b8ab7dcd017778caafb2f65fe55abd2c
|
|
| MD5 |
d6545e98879455a638a19ac30b4c2654
|
|
| BLAKE2b-256 |
38ebef3d331070ff0d47973e4bbd13be57add2993c2418c918f3f04ffdd7b89d
|
File details
Details for the file vectorwave-0.1.6-py3-none-any.whl.
File metadata
- Download URL: vectorwave-0.1.6-py3-none-any.whl
- Upload date:
- Size: 54.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fa6794fe69f28511d58277d744a4da0330e6cc4d40981753167566a6fe8ef6af
|
|
| MD5 |
5ac46daa736a5a5473b594111721000d
|
|
| BLAKE2b-256 |
21deae1c03c1e36666591f422c5871ceec30d8ed508187ac4b7b7b086615d5df
|