Skip to main content

VectorShift Python SDK

Project description

VectorShift SDK

Python SDK in development for VS pipeline creation and interaction

Documentation

The VectorShift SDK provides a Python interface for creating, managing, and executing AI pipelines on the VectorShift platform.

For comprehensive API documentation, visit: https://docs.vectorshift.ai/api-reference/overview

Installation

pip install vectorshift

Usage

Authentication

Set api key in code

vectorshift.api_key = 'sk-****'

or set as environement variable

export VECTORSHIFT_API_KEY='sk-***'

Create a new pipeline using the pipeline builder API

import vectorshift
from vectorshift.pipeline import Pipeline

vectorshift.api_key = "your api key here"

pipeline = Pipeline.new(name="basic-llm-pipeline")

inp = pipeline.add(name="input_0").input(input_type="string")

llm = pipeline.add(name="llm_node").llm(
    system="You are a helpful assistant.",
    prompt=inp.text,
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.7
)

out = pipeline.add(name="output_0").output(value=llm.response)

pipeline.save()

The builder provides full IDE autocomplete for all 200+ node types. You can also still instantiate nodes directly:

from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode

input_node = InputNode(node_name="input_0")

llm_node = LlmNode(
    node_name="llm_node",
    system="You are a helpful assistant.",
    prompt=input_node.text,
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.7
)

output_node = OutputNode(
    node_name="output_0",
    value=llm_node.response
)

pipeline = Pipeline.new(
    name="basic-llm-pipeline",
    nodes=[input_node, llm_node, output_node]
)

Basic Rag Pipeline

import vectorshift
from vectorshift.pipeline import Pipeline, InputNode, KnowledgeBaseNode, OutputNode, LlmNode
from vectorshift import KnowledgeBase

# Set API key
vectorshift.api_key = "your api key here"

# Create input node for user query
input_node = InputNode(
    node_name="Query",
)

# Fetch knowledge base
knowledge_base = KnowledgeBase.fetch(name="your knowledge base name here")

# Create knowledge base node to retrieve relevant documents
knowledge_base_node = KnowledgeBaseNode(
    query=input_node.text,
    knowledge_base=knowledge_base,
    format_context_for_llm=True,
)

# Create LLM node that uses both the query and retrieved documents
llm_node = LlmNode(
    system="You are a helpful assistant that answers questions based on the provided context documents.",
    prompt=f"Query: {input_node.text}\n\nContext: {knowledge_base_node.formatted_text}",
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.7
)

# Create output node for the LLM response
output_node = OutputNode(
    node_name="Response",
    value=llm_node.response
)

# Create the RAG pipeline
rag_pipeline = Pipeline.new(
    name="rag-pipeline",
    nodes=[input_node, knowledge_base_node, llm_node, output_node],
)

Batch Mode (List Processing)

Use pipeline.add_batch to run a node in batch mode. The node will process lists of inputs in parallel.

from vectorshift.pipeline import Pipeline

pipeline = Pipeline.new(name="batch-llm-pipeline")

inp = pipeline.add(name="input_0").input(input_type="vec<string>")

llm = pipeline.add_batch(name="batch_llm").llm(
    provider="openai",
    model="gpt-4o-mini",
    prompt=inp.value,
    system="You are concise. Reply in one sentence.",
    stream=False
)

pipeline.save()

result = pipeline.run(inputs={
    "input_0": [
        "Say hello to New York",
        "Say hello to Tokyo",
        "Say hello to Lagos",
    ]
})

You can also set batch mode explicitly on any node:

from vectorshift.pipeline import InputNode, PipelineNode, SplitTextNode, OutputNode, Pipeline

sub_pipeline = Pipeline.fetch(name="your sub pipeline")

input_node = InputNode(node_name="input_0")

split_text_node = SplitTextNode(
    node_name="split_text_node",
    text=input_node.text,
    delimiter="newline"
)

pipeline_node = PipelineNode(
    pipeline_id=sub_pipeline.id,
    node_name="sub_pipeline",
    input_0=split_text_node.processed_text,
    execution_mode="batch"
)

output_node = OutputNode(node_name="output_0", value=pipeline_node.output_0)

main_pipeline = Pipeline.new(
    name="batched-pipeline",
    nodes=[input_node, split_text_node, pipeline_node, output_node]
)

Streaming

import vectorshift
from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode

# Set API key
vectorshift.api_key = 'your api key here'

# Create input node
input_node = InputNode(node_name="input_0")

# Create LLM node that will stream responses
llm_node = LlmNode(
    node_name="llm_node",
    system="You are a helpful assistant.",
    prompt=input_node.text,
    provider="openai", 
    model="gpt-4o-mini",
    temperature=0.7,
    stream=True  # Enable streaming
)

# Create output node connected to LLM response
output_node = OutputNode(
    node_name="output_0",
    value=llm_node.response,
    output_type="stream<string>"
)

# Create and save the pipeline
pipeline = Pipeline.new(
    name="streaming-llm-pipeline-1",
    nodes=[input_node, llm_node, output_node]
)

# Run pipeline with streaming enabled
input_data = {"input_0": "Tell me a story about a brave adventurer"}

# Stream the response chunks
for chunk in pipeline.run(input_data, stream=True):
    try:
        # Parse the chunk as a JSON line
        chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
        if chunk_str.startswith('data: '):
            json_str = chunk_str[6:]  # Remove 'data: ' prefix
            import json
            data = json.loads(json_str)
            if data.get('output_name') == 'output_0':
                print(data.get('output_value', ''), end="", flush=True)
    except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
        # If parsing fails, just continue to next chunk
        continue

Async Usage

Call the async sdk methods by prefixing the sdk method with a. Here we can fetch a pipeline by name, run it with a particular input and await the pipeline results.

import asyncio
import vectorshift

from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode


vectorshift.api_key = "your api key here"


pipeline = Pipeline.fetch(name="your pipeline name here")

input_data = {"input_0": "Hello, how are you?"}

result  = asyncio.run(pipeline.arun(input_data))
print(result)

Parallel Knowledge Base Upload

We can use the async methods to parallelize bulk upload of the files in a directory to a knowledge base. Here we have a script that takes in a vectorstore name and a local diretory to upload.

import asyncio
import os
import argparse
import vectorshift
from vectorshift.knowledge_base import KnowledgeBase, IndexingConfig
from dotenv import load_dotenv
from tqdm import tqdm

load_dotenv()

def upload_documents(vectorstore_name, upload_dir, max_concurrent=16):
    vectorshift.api_key = 'your api key here'
    vectorstore = KnowledgeBase.fetch(name=vectorstore_name)

    num_files = sum([len(files) for r, d, files in os.walk(upload_dir)])
    print(f'Number of files in the upload directory: {num_files}')
    
    async def upload_document(semaphore, script_path, document_title, dirpath):
        async with semaphore:
            try:
                # Create indexing configuration
                indexing_config = IndexingConfig(
                    chunk_size=512,
                    chunk_overlap=0,
                    file_processing_implementation='Default',
                    index_tables=False,
                    analyze_documents=False
                )
                response = await vectorstore.aindex_document(
                    document_type='file',
                    document=script_path,
                    indexing_config=indexing_config
                )
                return f"Response for {document_title} in directory {dirpath}: {response}"
            except Exception as e:
                return f"Response for {document_title} in directory {dirpath}: Failed due to {e}"

    async def upload_all_documents():
        # Create semaphore to limit concurrent uploads
        semaphore = asyncio.Semaphore(max_concurrent)
        
        all_files = []
        for dirpath, dirnames, filenames in os.walk(upload_dir):
            for script_file in filenames:
                script_path = os.path.join(dirpath, script_file)
                document_title = os.path.basename(script_path)
                all_files.append((script_path, document_title, dirpath))
        
        # Create tasks for all files
        tasks = []
        for script_path, document_title, dirpath in all_files:
            task = upload_document(semaphore, script_path, document_title, dirpath)
            tasks.append(task)
        
        # Process all tasks with progress bar
        with tqdm(total=len(all_files), desc="Uploading documents") as pbar:
            for coro in asyncio.as_completed(tasks):
                result = await coro
                if "Failed due to" in result:
                    print(f"Error: {result}")
                else:
                    print(result)
                pbar.update(1)
    
    asyncio.run(upload_all_documents())

if __name__ == "__main__":
    # Setup command line argument parsing
    parser = argparse.ArgumentParser(description='Upload documents to a VectorStore.')
    parser.add_argument('--vectorstore_name', type=str, required=True, help='Name of the VectorStore to upload documents to.')
    parser.add_argument('--upload_dir', type=str, required=True, help='Directory path of documents to upload.')
    parser.add_argument('--max_concurrent', type=int, default=16, help='Maximum number of concurrent uploads.')
    args = parser.parse_args()

    upload_documents(args.vectorstore_name, args.upload_dir, args.max_concurrent)

Pipeline Management

Fetch by Name or ID

# Fetch by name
pipeline = Pipeline.fetch(name="My Pipeline")

# Fetch by ID
pipeline = Pipeline.fetch(id="pipeline-id-123")

# Fetch a specific version or branch
pipeline = Pipeline.fetch(id="pipeline-id-123", version="1.2.0")
pipeline = Pipeline.fetch(id="pipeline-id-123", branch_id="branch-id")

# Fetch from another user/org
pipeline = Pipeline.fetch(name="Shared Pipeline", username="other_user", org_name="my-org")

List Pipelines

pipelines = Pipeline.list()

# With filtering and pagination
pipelines = Pipeline.list(folder_id="folder-123", include_shared=True, offset=0, limit=20)

Duplicate a Pipeline

new_pipeline = existing_pipeline.duplicate(name="My Pipeline Copy")

Version Control

The SDK supports version control with semantic versioning and deploy controls via pipeline.save().

from vectorshift.pipeline import Pipeline, BumpLevel

pipeline = Pipeline.fetch(name="basic-llm-pipeline")

# Save and deploy (deploy=True is the default)
pipeline.save()

# Save with a version bump
pipeline.save(bump=BumpLevel.MINOR, description="Updated model to gpt-4o")

# Save without deploying
pipeline.save(deploy=False)

BumpLevel supports PATCH, MINOR, and MAJOR semantic version bumps.

To update a pipeline's nodes, modify them in code and save:

from vectorshift.pipeline import Pipeline

pipeline = Pipeline.fetch(name="basic-llm-pipeline")
pipeline.nodes.clear()

inp = pipeline.add(name="input_0").input(input_type="string")

llm = pipeline.add(name="llm_node").llm(
    system="You are a helpful assistant.",
    prompt=inp.text,
    provider="openai",
    model="gpt-4o",
    temperature=0.7
)

out = pipeline.add(name="output_0").output(value=llm.response)

pipeline.save(bump=BumpLevel.MINOR, description="Upgraded model to gpt-4o")

Conditional Logic

Use ConditionNode to add branching logic to your pipelines.

from vectorshift.pipeline import (
    Pipeline, InputNode, OutputNode, TextNode, MergeNode,
    ConditionNode, Operator, LogicalOp, Clause, ConditionGroup,
)

score_input = InputNode(node_name="score", input_type="int32")

condition = ConditionNode(
    node_name="grade_check",
    conditions=[
        ConditionGroup(
            [Clause(Operator.GREATER_THAN_EQUAL, score_input.value, 90)],
        ),
        ConditionGroup(
            [Clause(Operator.GREATER_THAN_EQUAL, score_input.value, 50)],
        ),
    ],
)

# Each condition group creates a path output (path_0, path_1, ..., path_else)
msg_excellent = TextNode(node_name="msg_a", text="Excellent!", dependencies=[condition.path_0])
msg_passing = TextNode(node_name="msg_b", text="You passed.", dependencies=[condition.path_1])
msg_fail = TextNode(node_name="msg_c", text="Did not pass.", dependencies=[condition.path_else])

merge = MergeNode(
    node_name="merge",
    function="first",
    type="string",
    fields=[msg_excellent.text, msg_passing.text, msg_fail.text],
)

output = OutputNode(node_name="result", output_type="string", value=merge.output)

pipeline = Pipeline.new(
    name="conditional-pipeline",
    nodes=[score_input, condition, msg_excellent, msg_passing, msg_fail, merge, output],
)

Conditions support 20+ operators including comparisons (EQUAL, GREATER_THAN, etc.), text checks (TEXT_CONTAINS, TEXT_STARTS_WITH, etc.), and unary checks (IS_EMPTY, IS_TRUE, etc.). Multiple clauses can be combined with LogicalOp.AND / LogicalOp.OR.

Chatbots

Run a chatbot. This code allows you to chat with your chatbot in your terminal. Since we provide conversation_id = None in the initial run the chatbot will start a new conversation. Note how by entering the conversation id returned by chatbot.run we can continue the conversation and have the chatbot see previous repsponses.

from vectorshift import Chatbot
chatbot = Chatbot.fetch(name = 'your chatbot name')

conversation_id = None
while True:
    user_input = input("User: ")
    if user_input.lower() == "quit":
        break
    response = chatbot.run(input=user_input, input_type="text", conversation_id=conversation_id)
    conversation_id = response['conversation_id']
    print(response['output_message'])

Streaming Chatbot

from vectorshift import Chatbot
chatbot = Chatbot.fetch(name = 'your chatbot name')

conversation_id = None
while True:
    user_input = input("User: ")
    if user_input.lower() == "quit":
        break
    response_stream = chatbot.run(input=user_input, input_type="text", conversation_id=conversation_id, stream=True)
    conversation_id = None
    for chunk in response_stream:
        try:
            chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
            if chunk_str.startswith('data: '):
                json_str = chunk_str[6:]  # Remove 'data: ' prefix
                import json
                data = json.loads(json_str)
                if data.get('conversation_id'):
                    conversation_id = data.get('conversation_id')
                elif data.get('output_value') and data.get('type') == 'stream':
                    print(data.get('output_value', ''), end="", flush=True)
        except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
            continue
    print()  # Add newline after streaming is complete

Chatbot File Upload

from vectorshift import Chatbot
import os
import json
chatbot = Chatbot.fetch(name='your chatbot name')

conversation_id = None
while True:
    user_input = input("User: ")
    if user_input.lower() == "quit":
        break
    
    # Handle file upload
    if user_input.startswith("add_file "):
        file_path = user_input[9:]  # Remove "add_file " prefix
        if os.path.isfile(file_path):
            try:
                upload_response = chatbot.upload_files(file_paths=[file_path], conversation_id=conversation_id)
                conversation_id = upload_response.get('conversation_id')
                print(f"File uploaded successfully: {upload_response.get('uploaded_files', [])}")
            except Exception as e:
                print(f"Error uploading file: {e}")
        else:
            print(f"File not found: {file_path}")
        continue
    
    # Handle text input with streaming
    response_stream = chatbot.run(input=user_input, input_type="text", conversation_id=conversation_id, stream=True)
    
    for chunk in response_stream:
        try:
            chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
            if not chunk_str.startswith('data: '):
                continue
                
            data = json.loads(chunk_str[6:])  # Remove 'data: ' prefix
            
            # Update conversation_id if present
            if data.get('conversation_id'):
                conversation_id = data.get('conversation_id')
            
            # Print streaming output
            if data.get('output_value') and data.get('type') == 'stream':
                print(data.get('output_value', ''), end="", flush=True)
                
        except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
            continue
    
    print()  # Add newline after streaming

Integrations

Managing Integrations

The Integration class provides methods to list, fetch, check status, and delete your integrations programmatically.

List all integrations

from vectorshift.integrations import Integration

# List all integrations
integrations = Integration.list()
for integration in integrations:
    print(f"{integration.name} ({integration.type}) - {integration.status}")

# Filter by type
slack_integrations = Integration.list(type="slack")

Fetch a specific integration

# Fetch by ID
integration = Integration.fetch(id="your_integration_id")
print(integration.name, integration.type, integration.status)

Check integration status

integration = Integration.fetch(id="your_integration_id")
status = integration.get_status()
print(f"Status: {status}")

Async support

All methods have async variants: alist(), afetch(), aget_status().

import asyncio
from vectorshift.integrations import Integration

async def main():
    integrations = await Integration.alist()
    if integrations:
        integration = await Integration.afetch(id=integrations[0].id)
        status = await integration.aget_status()
        print(f"{integration.name}: {status}")

asyncio.run(main())

Integration properties

Property Description
id Unique integration identifier
name Display name (account name)
type Integration type (e.g., slack, gmail, google_drive)
status Current status
created_date When the integration was created
authorized_scopes List of authorized OAuth scopes
allowed_actions List of permitted actions
unhealthy Whether the integration is in an unhealthy state

Using an integration in a pipeline

Integration nodes accept an integration object. You can fetch an existing integration or create one directly with the id.

from vectorshift.pipeline import IntegrationSlackNode, InputNode, Pipeline
from vectorshift.integrations import Integration
input_node = InputNode(node_name="input_0", description = 'Gmail Message to Send')

# Fetch integration to use in a pipeline
integration = Integration.fetch(id='your_integration_id')
# Or create directly with the id
# integration = Integration(id='your_integration_id')
gmail_node = IntegrationGmailNode(
    integration = integration.to_dict(),
    node_name="gmail_node",
    action="send_email",
    recipients="recipient@gmail.com",
    subject="Test Email from Pipeline",
    body=input_node.text,
    format="text"
)

gmail_pipeline = Pipeline.new(
    name="gmail-pipeline",
    nodes=[input_node, gmail_node]
)

To use the slack node specify the channel and team id accessible from the slack app

from vectorshift.pipeline import IntegrationSlackNode, InputNode, Pipeline
from vectorshift.integrations import Integration
input_node = InputNode(node_name="input_0", description = 'Slack Message to Send')


slack_node = IntegrationSlackNode(
    node_name="slack_node",
    integration = Integration(id='your_integration_id').to_dict(),
    action = 'send_message',
    channel = 'your_channel_id',
    message = input_node.text,
    team = 'your_team_id'
)

slack_pipeline = Pipeline.new(
    name = 'slack-pipeline',
    nodes = [input_node, slack_node]
)

Project details


Release history Release notifications | RSS feed

This version

0.1.5

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vectorshift-0.1.5.tar.gz (2.1 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

vectorshift-0.1.5-py3-none-any.whl (2.5 MB view details)

Uploaded Python 3

File details

Details for the file vectorshift-0.1.5.tar.gz.

File metadata

  • Download URL: vectorshift-0.1.5.tar.gz
  • Upload date:
  • Size: 2.1 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for vectorshift-0.1.5.tar.gz
Algorithm Hash digest
SHA256 9622b87f52da47e8358f1055c18e7a420d1ede0d952a2ff8d10bc908b7717f37
MD5 0154c9fbb62164925ad9b5ced51197d7
BLAKE2b-256 c9200ce75ad2fa1272db5a911747188e754338c43ac54f53dcca9085cbf9cafe

See more details on using hashes here.

File details

Details for the file vectorshift-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: vectorshift-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 2.5 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for vectorshift-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 f1b3c512ab13e493252d751e209246592c887f09a36ded5e453efd1a4344bda4
MD5 c94614fb1a8ccaaa43fd86b93a10ecbd
BLAKE2b-256 00e51b8d3c32bff4c768ff40d7f07cb5949a374908a182d36b3a08ae8cdc80a0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page