Skip to main content

VectorShift Python SDK

Project description

VectorShift SDK

Python SDK in development for VS pipeline creation and interaction

Documentation

The VectorShift SDK provides a Python interface for creating, managing, and executing AI pipelines on the VectorShift platform.

For comprehensive API documentation, visit: https://docs.vectorshift.ai/api-reference/overview

Installation

pip install vectorshift

Usage

Authentication

Set api key in code

vectorshift.api_key = 'sk-****'

or set as environement variable

export VECTORSHIFT_API_KEY='sk-***'

Create a new pipeline using the pipeline builder API

import vectorshift
from vectorshift.pipeline import Pipeline

vectorshift.api_key = "your api key here"

pipeline = Pipeline.new(name="basic-llm-pipeline")

inp = pipeline.add(name="input_0").input(input_type="string")

llm = pipeline.add(name="llm_node").llm(
    system="You are a helpful assistant.",
    prompt=inp.text,
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.7
)

out = pipeline.add(name="output_0").output(value=llm.response)

pipeline.save()

The builder provides full IDE autocomplete for all 200+ node types. You can also still instantiate nodes directly:

from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode

input_node = InputNode(node_name="input_0")

llm_node = LlmNode(
    node_name="llm_node",
    system="You are a helpful assistant.",
    prompt=input_node.text,
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.7
)

output_node = OutputNode(
    node_name="output_0",
    value=llm_node.response
)

pipeline = Pipeline.new(
    name="basic-llm-pipeline",
    nodes=[input_node, llm_node, output_node]
)

Basic Rag Pipeline

import vectorshift
from vectorshift.pipeline import Pipeline, InputNode, KnowledgeBaseNode, OutputNode, LlmNode
from vectorshift import KnowledgeBase

# Set API key
vectorshift.api_key = "your api key here"

# Create input node for user query
input_node = InputNode(
    node_name="Query",
)

# Fetch knowledge base
knowledge_base = KnowledgeBase.fetch(name="your knowledge base name here")

# Create knowledge base node to retrieve relevant documents
knowledge_base_node = KnowledgeBaseNode(
    query=input_node.text,
    knowledge_base=knowledge_base,
    format_context_for_llm=True,
)

# Create LLM node that uses both the query and retrieved documents
llm_node = LlmNode(
    system="You are a helpful assistant that answers questions based on the provided context documents.",
    prompt=f"Query: {input_node.text}\n\nContext: {knowledge_base_node.formatted_text}",
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.7
)

# Create output node for the LLM response
output_node = OutputNode(
    node_name="Response",
    value=llm_node.response
)

# Create the RAG pipeline
rag_pipeline = Pipeline.new(
    name="rag-pipeline",
    nodes=[input_node, knowledge_base_node, llm_node, output_node],
)

Batch Mode (List Processing)

Use pipeline.add_batch to run a node in batch mode. The node will process lists of inputs in parallel.

from vectorshift.pipeline import Pipeline

pipeline = Pipeline.new(name="batch-llm-pipeline")

inp = pipeline.add(name="input_0").input(input_type="vec<string>")

llm = pipeline.add_batch(name="batch_llm").llm(
    provider="openai",
    model="gpt-4o-mini",
    prompt=inp.value,
    system="You are concise. Reply in one sentence.",
    stream=False
)

pipeline.save()

result = pipeline.run(inputs={
    "input_0": [
        "Say hello to New York",
        "Say hello to Tokyo",
        "Say hello to Lagos",
    ]
})

You can also set batch mode explicitly on any node:

from vectorshift.pipeline import InputNode, PipelineNode, SplitTextNode, OutputNode, Pipeline

sub_pipeline = Pipeline.fetch(name="your sub pipeline")

input_node = InputNode(node_name="input_0")

split_text_node = SplitTextNode(
    node_name="split_text_node",
    text=input_node.text,
    delimiter="newline"
)

pipeline_node = PipelineNode(
    pipeline_id=sub_pipeline.id,
    node_name="sub_pipeline",
    input_0=split_text_node.processed_text,
    execution_mode="batch"
)

output_node = OutputNode(node_name="output_0", value=pipeline_node.output_0)

main_pipeline = Pipeline.new(
    name="batched-pipeline",
    nodes=[input_node, split_text_node, pipeline_node, output_node]
)

Streaming

import vectorshift
from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode

# Set API key
vectorshift.api_key = 'your api key here'

# Create input node
input_node = InputNode(node_name="input_0")

# Create LLM node that will stream responses
llm_node = LlmNode(
    node_name="llm_node",
    system="You are a helpful assistant.",
    prompt=input_node.text,
    provider="openai", 
    model="gpt-4o-mini",
    temperature=0.7,
    stream=True  # Enable streaming
)

# Create output node connected to LLM response
output_node = OutputNode(
    node_name="output_0",
    value=llm_node.response,
    output_type="stream<string>"
)

# Create and save the pipeline
pipeline = Pipeline.new(
    name="streaming-llm-pipeline-1",
    nodes=[input_node, llm_node, output_node]
)

# Run pipeline with streaming enabled
input_data = {"input_0": "Tell me a story about a brave adventurer"}

# Stream the response chunks
for chunk in pipeline.run(input_data, stream=True):
    try:
        # Parse the chunk as a JSON line
        chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
        if chunk_str.startswith('data: '):
            json_str = chunk_str[6:]  # Remove 'data: ' prefix
            import json
            data = json.loads(json_str)
            if data.get('output_name') == 'output_0':
                print(data.get('output_value', ''), end="", flush=True)
    except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
        # If parsing fails, just continue to next chunk
        continue

Async Usage

Call the async sdk methods by prefixing the sdk method with a. Here we can fetch a pipeline by name, run it with a particular input and await the pipeline results.

import asyncio
import vectorshift

from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode


vectorshift.api_key = "your api key here"


pipeline = Pipeline.fetch(name="your pipeline name here")

input_data = {"input_0": "Hello, how are you?"}

result  = asyncio.run(pipeline.arun(input_data))
print(result)

Parallel Knowledge Base Upload

We can use the async methods to parallelize bulk upload of the files in a directory to a knowledge base. Here we have a script that takes in a vectorstore name and a local diretory to upload.

import asyncio
import os
import argparse
import vectorshift
from vectorshift.knowledge_base import KnowledgeBase, IndexingConfig
from dotenv import load_dotenv
from tqdm import tqdm

load_dotenv()

def upload_documents(vectorstore_name, upload_dir, max_concurrent=16):
    vectorshift.api_key = 'your api key here'
    vectorstore = KnowledgeBase.fetch(name=vectorstore_name)

    num_files = sum([len(files) for r, d, files in os.walk(upload_dir)])
    print(f'Number of files in the upload directory: {num_files}')
    
    async def upload_document(semaphore, script_path, document_title, dirpath):
        async with semaphore:
            try:
                # Create indexing configuration
                indexing_config = IndexingConfig(
                    chunk_size=512,
                    chunk_overlap=0,
                    file_processing_implementation='Default',
                    index_tables=False,
                    analyze_documents=False
                )
                response = await vectorstore.aindex_document(
                    document_type='file',
                    document=script_path,
                    indexing_config=indexing_config
                )
                return f"Response for {document_title} in directory {dirpath}: {response}"
            except Exception as e:
                return f"Response for {document_title} in directory {dirpath}: Failed due to {e}"

    async def upload_all_documents():
        # Create semaphore to limit concurrent uploads
        semaphore = asyncio.Semaphore(max_concurrent)
        
        all_files = []
        for dirpath, dirnames, filenames in os.walk(upload_dir):
            for script_file in filenames:
                script_path = os.path.join(dirpath, script_file)
                document_title = os.path.basename(script_path)
                all_files.append((script_path, document_title, dirpath))
        
        # Create tasks for all files
        tasks = []
        for script_path, document_title, dirpath in all_files:
            task = upload_document(semaphore, script_path, document_title, dirpath)
            tasks.append(task)
        
        # Process all tasks with progress bar
        with tqdm(total=len(all_files), desc="Uploading documents") as pbar:
            for coro in asyncio.as_completed(tasks):
                result = await coro
                if "Failed due to" in result:
                    print(f"Error: {result}")
                else:
                    print(result)
                pbar.update(1)
    
    asyncio.run(upload_all_documents())

if __name__ == "__main__":
    # Setup command line argument parsing
    parser = argparse.ArgumentParser(description='Upload documents to a VectorStore.')
    parser.add_argument('--vectorstore_name', type=str, required=True, help='Name of the VectorStore to upload documents to.')
    parser.add_argument('--upload_dir', type=str, required=True, help='Directory path of documents to upload.')
    parser.add_argument('--max_concurrent', type=int, default=16, help='Maximum number of concurrent uploads.')
    args = parser.parse_args()

    upload_documents(args.vectorstore_name, args.upload_dir, args.max_concurrent)

Pipeline Management

Fetch by Name or ID

# Fetch by name
pipeline = Pipeline.fetch(name="My Pipeline")

# Fetch by ID
pipeline = Pipeline.fetch(id="pipeline-id-123")

# Fetch a specific version or branch
pipeline = Pipeline.fetch(id="pipeline-id-123", version="1.2.0")
pipeline = Pipeline.fetch(id="pipeline-id-123", branch_id="branch-id")

# Fetch from another user/org
pipeline = Pipeline.fetch(name="Shared Pipeline", username="other_user", org_name="my-org")

List Pipelines

pipelines = Pipeline.list()

# With filtering and pagination
pipelines = Pipeline.list(folder_id="folder-123", include_shared=True, offset=0, limit=20)

Duplicate a Pipeline

new_pipeline = existing_pipeline.duplicate(name="My Pipeline Copy")

Version Control

The SDK supports version control with semantic versioning and deploy controls via pipeline.save().

from vectorshift.pipeline import Pipeline, BumpLevel

pipeline = Pipeline.fetch(name="basic-llm-pipeline")

# Save and deploy (deploy=True is the default)
pipeline.save()

# Save with a version bump
pipeline.save(bump=BumpLevel.MINOR, description="Updated model to gpt-4o")

# Save without deploying
pipeline.save(deploy=False)

BumpLevel supports PATCH, MINOR, and MAJOR semantic version bumps.

To update a pipeline's nodes, modify them in code and save:

from vectorshift.pipeline import Pipeline

pipeline = Pipeline.fetch(name="basic-llm-pipeline")
pipeline.nodes.clear()

inp = pipeline.add(name="input_0").input(input_type="string")

llm = pipeline.add(name="llm_node").llm(
    system="You are a helpful assistant.",
    prompt=inp.text,
    provider="openai",
    model="gpt-4o",
    temperature=0.7
)

out = pipeline.add(name="output_0").output(value=llm.response)

pipeline.save(bump=BumpLevel.MINOR, description="Upgraded model to gpt-4o")

Conditional Logic

Use ConditionNode to add branching logic to your pipelines.

from vectorshift.pipeline import (
    Pipeline, InputNode, OutputNode, TextNode, MergeNode,
    ConditionNode, Operator, LogicalOp, Clause, ConditionGroup,
)

score_input = InputNode(node_name="score", input_type="int32")

condition = ConditionNode(
    node_name="grade_check",
    conditions=[
        ConditionGroup(
            [Clause(Operator.GREATER_THAN_EQUAL, score_input.value, 90)],
        ),
        ConditionGroup(
            [Clause(Operator.GREATER_THAN_EQUAL, score_input.value, 50)],
        ),
    ],
)

# Each condition group creates a path output (path_0, path_1, ..., path_else)
msg_excellent = TextNode(node_name="msg_a", text="Excellent!", dependencies=[condition.path_0])
msg_passing = TextNode(node_name="msg_b", text="You passed.", dependencies=[condition.path_1])
msg_fail = TextNode(node_name="msg_c", text="Did not pass.", dependencies=[condition.path_else])

merge = MergeNode(
    node_name="merge",
    function="first",
    type="string",
    fields=[msg_excellent.text, msg_passing.text, msg_fail.text],
)

output = OutputNode(node_name="result", output_type="string", value=merge.output)

pipeline = Pipeline.new(
    name="conditional-pipeline",
    nodes=[score_input, condition, msg_excellent, msg_passing, msg_fail, merge, output],
)

Conditions support 20+ operators including comparisons (EQUAL, GREATER_THAN, etc.), text checks (TEXT_CONTAINS, TEXT_STARTS_WITH, etc.), and unary checks (IS_EMPTY, IS_TRUE, etc.). Multiple clauses can be combined with LogicalOp.AND / LogicalOp.OR.

Chatbots

Run a chatbot. This code allows you to chat with your chatbot in your terminal. Since we provide conversation_id = None in the initial run the chatbot will start a new conversation. Note how by entering the conversation id returned by chatbot.run we can continue the conversation and have the chatbot see previous repsponses.

from vectorshift import Chatbot
chatbot = Chatbot.fetch(name = 'your chatbot name')

conversation_id = None
while True:
    user_input = input("User: ")
    if user_input.lower() == "quit":
        break
    response = chatbot.run(input=user_input, input_type="text", conversation_id=conversation_id)
    conversation_id = response['conversation_id']
    print(response['output_message'])

Streaming Chatbot

from vectorshift import Chatbot
chatbot = Chatbot.fetch(name = 'your chatbot name')

conversation_id = None
while True:
    user_input = input("User: ")
    if user_input.lower() == "quit":
        break
    response_stream = chatbot.run(input=user_input, input_type="text", conversation_id=conversation_id, stream=True)
    conversation_id = None
    for chunk in response_stream:
        try:
            chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
            if chunk_str.startswith('data: '):
                json_str = chunk_str[6:]  # Remove 'data: ' prefix
                import json
                data = json.loads(json_str)
                if data.get('conversation_id'):
                    conversation_id = data.get('conversation_id')
                elif data.get('output_value') and data.get('type') == 'stream':
                    print(data.get('output_value', ''), end="", flush=True)
        except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
            continue
    print()  # Add newline after streaming is complete

Chatbot File Upload

from vectorshift import Chatbot
import os
import json
chatbot = Chatbot.fetch(name='your chatbot name')

conversation_id = None
while True:
    user_input = input("User: ")
    if user_input.lower() == "quit":
        break
    
    # Handle file upload
    if user_input.startswith("add_file "):
        file_path = user_input[9:]  # Remove "add_file " prefix
        if os.path.isfile(file_path):
            try:
                upload_response = chatbot.upload_files(file_paths=[file_path], conversation_id=conversation_id)
                conversation_id = upload_response.get('conversation_id')
                print(f"File uploaded successfully: {upload_response.get('uploaded_files', [])}")
            except Exception as e:
                print(f"Error uploading file: {e}")
        else:
            print(f"File not found: {file_path}")
        continue
    
    # Handle text input with streaming
    response_stream = chatbot.run(input=user_input, input_type="text", conversation_id=conversation_id, stream=True)
    
    for chunk in response_stream:
        try:
            chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
            if not chunk_str.startswith('data: '):
                continue
                
            data = json.loads(chunk_str[6:])  # Remove 'data: ' prefix
            
            # Update conversation_id if present
            if data.get('conversation_id'):
                conversation_id = data.get('conversation_id')
            
            # Print streaming output
            if data.get('output_value') and data.get('type') == 'stream':
                print(data.get('output_value', ''), end="", flush=True)
                
        except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
            continue
    
    print()  # Add newline after streaming

Integrations

Managing Integrations

The Integration class provides methods to list, fetch, check status, and delete your integrations programmatically.

List all integrations

from vectorshift.integrations import Integration

# List all integrations
integrations = Integration.list()
for integration in integrations:
    print(f"{integration.name} ({integration.type}) - {integration.status}")

# Filter by type
slack_integrations = Integration.list(type="slack")

Fetch a specific integration

# Fetch by ID
integration = Integration.fetch(id="your_integration_id")
print(integration.name, integration.type, integration.status)

Check integration status

integration = Integration.fetch(id="your_integration_id")
status = integration.get_status()
print(f"Status: {status}")

Async support

All methods have async variants: alist(), afetch(), aget_status().

import asyncio
from vectorshift.integrations import Integration

async def main():
    integrations = await Integration.alist()
    if integrations:
        integration = await Integration.afetch(id=integrations[0].id)
        status = await integration.aget_status()
        print(f"{integration.name}: {status}")

asyncio.run(main())

Integration properties

Property Description
id Unique integration identifier
name Display name (account name)
type Integration type (e.g., slack, gmail, google_drive)
status Current status
created_date When the integration was created
authorized_scopes List of authorized OAuth scopes
allowed_actions List of permitted actions
unhealthy Whether the integration is in an unhealthy state

Using an integration in a pipeline

Integration nodes accept an integration object. You can fetch an existing integration or create one directly with the id.

from vectorshift.pipeline import IntegrationSlackNode, InputNode, Pipeline
from vectorshift.integrations import Integration
input_node = InputNode(node_name="input_0", description = 'Gmail Message to Send')

# Fetch integration to use in a pipeline
integration = Integration.fetch(id='your_integration_id')
# Or create directly with the id
# integration = Integration(id='your_integration_id')
gmail_node = IntegrationGmailNode(
    integration = integration.to_dict(),
    node_name="gmail_node",
    action="send_email",
    recipients="recipient@gmail.com",
    subject="Test Email from Pipeline",
    body=input_node.text,
    format="text"
)

gmail_pipeline = Pipeline.new(
    name="gmail-pipeline",
    nodes=[input_node, gmail_node]
)

To use the slack node specify the channel and team id accessible from the slack app

from vectorshift.pipeline import IntegrationSlackNode, InputNode, Pipeline
from vectorshift.integrations import Integration
input_node = InputNode(node_name="input_0", description = 'Slack Message to Send')


slack_node = IntegrationSlackNode(
    node_name="slack_node",
    integration = Integration(id='your_integration_id').to_dict(),
    action = 'send_message',
    channel = 'your_channel_id',
    message = input_node.text,
    team = 'your_team_id'
)

slack_pipeline = Pipeline.new(
    name = 'slack-pipeline',
    nodes = [input_node, slack_node]
)

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vectorshift-0.1.7.tar.gz (2.1 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

vectorshift-0.1.7-py3-none-any.whl (2.6 MB view details)

Uploaded Python 3

File details

Details for the file vectorshift-0.1.7.tar.gz.

File metadata

  • Download URL: vectorshift-0.1.7.tar.gz
  • Upload date:
  • Size: 2.1 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for vectorshift-0.1.7.tar.gz
Algorithm Hash digest
SHA256 4499e78e087c9b867cc0e4e9df2d18556fa8c28740dc235938f08943ab4d0500
MD5 144d243814899f2f29b2ce25cc490cac
BLAKE2b-256 3ea6a52907bc77525adf8449165645fcc5a532d57ff9acb35f78f5813d16d682

See more details on using hashes here.

File details

Details for the file vectorshift-0.1.7-py3-none-any.whl.

File metadata

  • Download URL: vectorshift-0.1.7-py3-none-any.whl
  • Upload date:
  • Size: 2.6 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for vectorshift-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 05bf4c6d402650a4027305aa76e3a1feb6255147351b57924bbc85fefd2691e7
MD5 087ac26e1a68f0387ebf8760bd085fbf
BLAKE2b-256 6cf1bbc852a57ac7a18b584748b3b2fe606cb3ccf83f69ce30d998817835955b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page