VectorShift Python SDK
Project description
VectorShift SDK
Python SDK in development for VS pipeline creation and interaction
Documentation
The VectorShift SDK provides a Python interface for creating, managing, and executing AI pipelines on the VectorShift platform.
For comprehensive API documentation, visit: https://docs.vectorshift.ai/api-reference/overview
Installation
pip install vectorshift
Usage
Authentication
Set api key in code
vectorshift.api_key = 'sk-****'
or set as environement variable
export VECTORSHIFT_API_KEY='sk-***'
Create a new pipeline using the pipeline builder API
import vectorshift
from vectorshift.pipeline import Pipeline
vectorshift.api_key = "your api key here"
pipeline = Pipeline.new(name="basic-llm-pipeline")
inp = pipeline.add(name="input_0").input(input_type="string")
llm = pipeline.add(name="llm_node").llm(
system="You are a helpful assistant.",
prompt=inp.text,
provider="openai",
model="gpt-4o-mini",
temperature=0.7
)
out = pipeline.add(name="output_0").output(value=llm.response)
pipeline.save()
The builder provides full IDE autocomplete for all 200+ node types. You can also still instantiate nodes directly:
from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode
input_node = InputNode(node_name="input_0")
llm_node = LlmNode(
node_name="llm_node",
system="You are a helpful assistant.",
prompt=input_node.text,
provider="openai",
model="gpt-4o-mini",
temperature=0.7
)
output_node = OutputNode(
node_name="output_0",
value=llm_node.response
)
pipeline = Pipeline.new(
name="basic-llm-pipeline",
nodes=[input_node, llm_node, output_node]
)
Basic Rag Pipeline
import vectorshift
from vectorshift.pipeline import Pipeline, InputNode, KnowledgeBaseNode, OutputNode, LlmNode
from vectorshift import KnowledgeBase
# Set API key
vectorshift.api_key = "your api key here"
# Create input node for user query
input_node = InputNode(
node_name="Query",
)
# Fetch knowledge base
knowledge_base = KnowledgeBase.fetch(name="your knowledge base name here")
# Create knowledge base node to retrieve relevant documents
knowledge_base_node = KnowledgeBaseNode(
query=input_node.text,
knowledge_base=knowledge_base,
format_context_for_llm=True,
)
# Create LLM node that uses both the query and retrieved documents
llm_node = LlmNode(
system="You are a helpful assistant that answers questions based on the provided context documents.",
prompt=f"Query: {input_node.text}\n\nContext: {knowledge_base_node.formatted_text}",
provider="openai",
model="gpt-4o-mini",
temperature=0.7
)
# Create output node for the LLM response
output_node = OutputNode(
node_name="Response",
value=llm_node.response
)
# Create the RAG pipeline
rag_pipeline = Pipeline.new(
name="rag-pipeline",
nodes=[input_node, knowledge_base_node, llm_node, output_node],
)
Batch Mode (List Processing)
Use pipeline.add_batch to run a node in batch mode. The node will process lists of inputs in parallel.
from vectorshift.pipeline import Pipeline
pipeline = Pipeline.new(name="batch-llm-pipeline")
inp = pipeline.add(name="input_0").input(input_type="vec<string>")
llm = pipeline.add_batch(name="batch_llm").llm(
provider="openai",
model="gpt-4o-mini",
prompt=inp.value,
system="You are concise. Reply in one sentence.",
stream=False
)
pipeline.save()
result = pipeline.run(inputs={
"input_0": [
"Say hello to New York",
"Say hello to Tokyo",
"Say hello to Lagos",
]
})
You can also set batch mode explicitly on any node:
from vectorshift.pipeline import InputNode, PipelineNode, SplitTextNode, OutputNode, Pipeline
sub_pipeline = Pipeline.fetch(name="your sub pipeline")
input_node = InputNode(node_name="input_0")
split_text_node = SplitTextNode(
node_name="split_text_node",
text=input_node.text,
delimiter="newline"
)
pipeline_node = PipelineNode(
pipeline_id=sub_pipeline.id,
node_name="sub_pipeline",
input_0=split_text_node.processed_text,
execution_mode="batch"
)
output_node = OutputNode(node_name="output_0", value=pipeline_node.output_0)
main_pipeline = Pipeline.new(
name="batched-pipeline",
nodes=[input_node, split_text_node, pipeline_node, output_node]
)
Streaming
import vectorshift
from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode
# Set API key
vectorshift.api_key = 'your api key here'
# Create input node
input_node = InputNode(node_name="input_0")
# Create LLM node that will stream responses
llm_node = LlmNode(
node_name="llm_node",
system="You are a helpful assistant.",
prompt=input_node.text,
provider="openai",
model="gpt-4o-mini",
temperature=0.7,
stream=True # Enable streaming
)
# Create output node connected to LLM response
output_node = OutputNode(
node_name="output_0",
value=llm_node.response,
output_type="stream<string>"
)
# Create and save the pipeline
pipeline = Pipeline.new(
name="streaming-llm-pipeline-1",
nodes=[input_node, llm_node, output_node]
)
# Run pipeline with streaming enabled
input_data = {"input_0": "Tell me a story about a brave adventurer"}
# Stream the response chunks
for chunk in pipeline.run(input_data, stream=True):
try:
# Parse the chunk as a JSON line
chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
if chunk_str.startswith('data: '):
json_str = chunk_str[6:] # Remove 'data: ' prefix
import json
data = json.loads(json_str)
if data.get('output_name') == 'output_0':
print(data.get('output_value', ''), end="", flush=True)
except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
# If parsing fails, just continue to next chunk
continue
Async Usage
Call the async sdk methods by prefixing the sdk method with a. Here we can fetch a pipeline by name, run it with a particular input and await the pipeline results.
import asyncio
import vectorshift
from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode
vectorshift.api_key = "your api key here"
pipeline = Pipeline.fetch(name="your pipeline name here")
input_data = {"input_0": "Hello, how are you?"}
result = asyncio.run(pipeline.arun(input_data))
print(result)
Parallel Knowledge Base Upload
We can use the async methods to parallelize bulk upload of the files in a directory to a knowledge base. Here we have a script that takes in a vectorstore name and a local diretory to upload.
import asyncio
import os
import argparse
import vectorshift
from vectorshift.knowledge_base import KnowledgeBase, IndexingConfig
from dotenv import load_dotenv
from tqdm import tqdm
load_dotenv()
def upload_documents(vectorstore_name, upload_dir, max_concurrent=16):
vectorshift.api_key = 'your api key here'
vectorstore = KnowledgeBase.fetch(name=vectorstore_name)
num_files = sum([len(files) for r, d, files in os.walk(upload_dir)])
print(f'Number of files in the upload directory: {num_files}')
async def upload_document(semaphore, script_path, document_title, dirpath):
async with semaphore:
try:
# Create indexing configuration
indexing_config = IndexingConfig(
chunk_size=512,
chunk_overlap=0,
file_processing_implementation='Default',
index_tables=False,
analyze_documents=False
)
response = await vectorstore.aindex_document(
document_type='file',
document=script_path,
indexing_config=indexing_config
)
return f"Response for {document_title} in directory {dirpath}: {response}"
except Exception as e:
return f"Response for {document_title} in directory {dirpath}: Failed due to {e}"
async def upload_all_documents():
# Create semaphore to limit concurrent uploads
semaphore = asyncio.Semaphore(max_concurrent)
all_files = []
for dirpath, dirnames, filenames in os.walk(upload_dir):
for script_file in filenames:
script_path = os.path.join(dirpath, script_file)
document_title = os.path.basename(script_path)
all_files.append((script_path, document_title, dirpath))
# Create tasks for all files
tasks = []
for script_path, document_title, dirpath in all_files:
task = upload_document(semaphore, script_path, document_title, dirpath)
tasks.append(task)
# Process all tasks with progress bar
with tqdm(total=len(all_files), desc="Uploading documents") as pbar:
for coro in asyncio.as_completed(tasks):
result = await coro
if "Failed due to" in result:
print(f"Error: {result}")
else:
print(result)
pbar.update(1)
asyncio.run(upload_all_documents())
if __name__ == "__main__":
# Setup command line argument parsing
parser = argparse.ArgumentParser(description='Upload documents to a VectorStore.')
parser.add_argument('--vectorstore_name', type=str, required=True, help='Name of the VectorStore to upload documents to.')
parser.add_argument('--upload_dir', type=str, required=True, help='Directory path of documents to upload.')
parser.add_argument('--max_concurrent', type=int, default=16, help='Maximum number of concurrent uploads.')
args = parser.parse_args()
upload_documents(args.vectorstore_name, args.upload_dir, args.max_concurrent)
Pipeline Management
Fetch by Name or ID
# Fetch by name
pipeline = Pipeline.fetch(name="My Pipeline")
# Fetch by ID
pipeline = Pipeline.fetch(id="pipeline-id-123")
# Fetch a specific version or branch
pipeline = Pipeline.fetch(id="pipeline-id-123", version="1.2.0")
pipeline = Pipeline.fetch(id="pipeline-id-123", branch_id="branch-id")
# Fetch from another user/org
pipeline = Pipeline.fetch(name="Shared Pipeline", username="other_user", org_name="my-org")
List Pipelines
pipelines = Pipeline.list()
# With filtering and pagination
pipelines = Pipeline.list(folder_id="folder-123", include_shared=True, offset=0, limit=20)
Duplicate a Pipeline
new_pipeline = existing_pipeline.duplicate(name="My Pipeline Copy")
Version Control
The SDK supports version control with semantic versioning and deploy controls via pipeline.save().
from vectorshift.pipeline import Pipeline, BumpLevel
pipeline = Pipeline.fetch(name="basic-llm-pipeline")
# Save and deploy (deploy=True is the default)
pipeline.save()
# Save with a version bump
pipeline.save(bump=BumpLevel.MINOR, description="Updated model to gpt-4o")
# Save without deploying
pipeline.save(deploy=False)
BumpLevel supports PATCH, MINOR, and MAJOR semantic version bumps.
To update a pipeline's nodes, modify them in code and save:
from vectorshift.pipeline import Pipeline
pipeline = Pipeline.fetch(name="basic-llm-pipeline")
pipeline.nodes.clear()
inp = pipeline.add(name="input_0").input(input_type="string")
llm = pipeline.add(name="llm_node").llm(
system="You are a helpful assistant.",
prompt=inp.text,
provider="openai",
model="gpt-4o",
temperature=0.7
)
out = pipeline.add(name="output_0").output(value=llm.response)
pipeline.save(bump=BumpLevel.MINOR, description="Upgraded model to gpt-4o")
Conditional Logic
Use ConditionNode to add branching logic to your pipelines.
from vectorshift.pipeline import (
Pipeline, InputNode, OutputNode, TextNode, MergeNode,
ConditionNode, Operator, LogicalOp, Clause, ConditionGroup,
)
score_input = InputNode(node_name="score", input_type="int32")
condition = ConditionNode(
node_name="grade_check",
conditions=[
ConditionGroup(
[Clause(Operator.GREATER_THAN_EQUAL, score_input.value, 90)],
),
ConditionGroup(
[Clause(Operator.GREATER_THAN_EQUAL, score_input.value, 50)],
),
],
)
# Each condition group creates a path output (path_0, path_1, ..., path_else)
msg_excellent = TextNode(node_name="msg_a", text="Excellent!", dependencies=[condition.path_0])
msg_passing = TextNode(node_name="msg_b", text="You passed.", dependencies=[condition.path_1])
msg_fail = TextNode(node_name="msg_c", text="Did not pass.", dependencies=[condition.path_else])
merge = MergeNode(
node_name="merge",
function="first",
type="string",
fields=[msg_excellent.text, msg_passing.text, msg_fail.text],
)
output = OutputNode(node_name="result", output_type="string", value=merge.output)
pipeline = Pipeline.new(
name="conditional-pipeline",
nodes=[score_input, condition, msg_excellent, msg_passing, msg_fail, merge, output],
)
Conditions support 20+ operators including comparisons (EQUAL, GREATER_THAN, etc.), text checks (TEXT_CONTAINS, TEXT_STARTS_WITH, etc.), and unary checks (IS_EMPTY, IS_TRUE, etc.). Multiple clauses can be combined with LogicalOp.AND / LogicalOp.OR.
Chatbots
Run a chatbot. This code allows you to chat with your chatbot in your terminal. Since we provide conversation_id = None in the initial run the chatbot will start a new conversation. Note how by entering the conversation id returned by chatbot.run we can continue the conversation and have the chatbot see previous repsponses.
from vectorshift import Chatbot
chatbot = Chatbot.fetch(name = 'your chatbot name')
conversation_id = None
while True:
user_input = input("User: ")
if user_input.lower() == "quit":
break
response = chatbot.run(input=user_input, input_type="text", conversation_id=conversation_id)
conversation_id = response['conversation_id']
print(response['output_message'])
Streaming Chatbot
from vectorshift import Chatbot
chatbot = Chatbot.fetch(name = 'your chatbot name')
conversation_id = None
while True:
user_input = input("User: ")
if user_input.lower() == "quit":
break
response_stream = chatbot.run(input=user_input, input_type="text", conversation_id=conversation_id, stream=True)
conversation_id = None
for chunk in response_stream:
try:
chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
if chunk_str.startswith('data: '):
json_str = chunk_str[6:] # Remove 'data: ' prefix
import json
data = json.loads(json_str)
if data.get('conversation_id'):
conversation_id = data.get('conversation_id')
elif data.get('output_value') and data.get('type') == 'stream':
print(data.get('output_value', ''), end="", flush=True)
except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
continue
print() # Add newline after streaming is complete
Chatbot File Upload
from vectorshift import Chatbot
import os
import json
chatbot = Chatbot.fetch(name='your chatbot name')
conversation_id = None
while True:
user_input = input("User: ")
if user_input.lower() == "quit":
break
# Handle file upload
if user_input.startswith("add_file "):
file_path = user_input[9:] # Remove "add_file " prefix
if os.path.isfile(file_path):
try:
upload_response = chatbot.upload_files(file_paths=[file_path], conversation_id=conversation_id)
conversation_id = upload_response.get('conversation_id')
print(f"File uploaded successfully: {upload_response.get('uploaded_files', [])}")
except Exception as e:
print(f"Error uploading file: {e}")
else:
print(f"File not found: {file_path}")
continue
# Handle text input with streaming
response_stream = chatbot.run(input=user_input, input_type="text", conversation_id=conversation_id, stream=True)
for chunk in response_stream:
try:
chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
if not chunk_str.startswith('data: '):
continue
data = json.loads(chunk_str[6:]) # Remove 'data: ' prefix
# Update conversation_id if present
if data.get('conversation_id'):
conversation_id = data.get('conversation_id')
# Print streaming output
if data.get('output_value') and data.get('type') == 'stream':
print(data.get('output_value', ''), end="", flush=True)
except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
continue
print() # Add newline after streaming
Integrations
Integration nodes accept an integration object that include the id of your integration
from vectorshift.pipeline import IntegrationSlackNode, InputNode, Pipeline
from vectorshift.integrations import Integration
input_node = InputNode(node_name="input_0", description = 'Gmail Message to Send')
integration_id = 'your integration id'
integration = Integration(object_id = integration_id)
gmail_node = IntegrationGmailNode(
integration = integration,
node_name="gmail_node",
action="send_email",
recipients="recipient@gmail.com",
subject="Test Email from Pipeline",
body=input_node.text,
format="text"
)
gmail_pipeline = Pipeline.new(
name="gmail-pipeline",
nodes=[input_node, gmail_node]
)
To use the slack node specify the channel and team id accessible from the slack app
from vectorshift.pipeline import IntegrationSlackNode, InputNode, Pipeline
from vectorshift.integrations import Integration
input_node = InputNode(node_name="input_0", description = 'Slack Message to Send')
integration_id = 'your_integration_id'
slack_node = IntegrationSlackNode(
node_name="slack_node",
integration = Integration(object_id = integration_id),
action = 'send_message',
channel = 'your_channel_id',
message = input_node.text,
team = 'your_team_id'
)
slack_pipeline = Pipeline.new(
name = 'slack-pipeline',
nodes = [input_node, slack_node]
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file vectorshift-0.1.1.tar.gz.
File metadata
- Download URL: vectorshift-0.1.1.tar.gz
- Upload date:
- Size: 1.4 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
df0a000987255dc91d488cb39465294833178184c58c4996ea92242f521b5154
|
|
| MD5 |
4218a11059fa105a771b4f008ecaea7a
|
|
| BLAKE2b-256 |
131e0def420f1398c8cdc094b4ab44ed686d918dfaff7308c77e5f37ea662d9d
|
File details
Details for the file vectorshift-0.1.1-py3-none-any.whl.
File metadata
- Download URL: vectorshift-0.1.1-py3-none-any.whl
- Upload date:
- Size: 1.8 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
96342a21e484e34f8e343a581c3d19c2f002b85f5fda37470a7986a3261ce081
|
|
| MD5 |
1b4b818ac11563ae07a7ec6512ef84f1
|
|
| BLAKE2b-256 |
bab381ff6d3612f730b57c5f7c0763d2692f075518ff0fc4d52266cbde713eac
|