Skip to main content

VectorShift Python SDK

Project description

VectorShift SDK

Python SDK in development for VS pipeline creation and interaction

Documentation

The VectorShift SDK provides a Python interface for creating, managing, and executing AI pipelines on the VectorShift platform.

For comprehensive API documentation, visit: https://docs.vectorshift.ai/api-reference/overview

Installation

pip install vectorshift

Usage

Authentication

Set api key in code

vectorshift.api_key = 'sk-****'

or set as environement variable

export VECTORSHIFT_API_KEY='sk-***'

Create a new pipeline using the pipeline builder API

import vectorshift
from vectorshift.pipeline import Pipeline

vectorshift.api_key = "your api key here"

pipeline = Pipeline.new(name="basic-llm-pipeline")

inp = pipeline.add(name="input_0").input(input_type="string")

llm = pipeline.add(name="llm_node").llm(
    system="You are a helpful assistant.",
    prompt=inp.text,
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.7
)

out = pipeline.add(name="output_0").output(value=llm.response)

pipeline.save()

The builder provides full IDE autocomplete for all 200+ node types. You can also still instantiate nodes directly:

from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode

input_node = InputNode(node_name="input_0")

llm_node = LlmNode(
    node_name="llm_node",
    system="You are a helpful assistant.",
    prompt=input_node.text,
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.7
)

output_node = OutputNode(
    node_name="output_0",
    value=llm_node.response
)

pipeline = Pipeline.new(
    name="basic-llm-pipeline",
    nodes=[input_node, llm_node, output_node]
)

Basic Rag Pipeline

import vectorshift
from vectorshift.pipeline import Pipeline, InputNode, KnowledgeBaseNode, OutputNode, LlmNode
from vectorshift import KnowledgeBase

# Set API key
vectorshift.api_key = "your api key here"

# Create input node for user query
input_node = InputNode(
    node_name="Query",
)

# Fetch knowledge base
knowledge_base = KnowledgeBase.fetch(name="your knowledge base name here")

# Create knowledge base node to retrieve relevant documents
knowledge_base_node = KnowledgeBaseNode(
    query=input_node.text,
    knowledge_base=knowledge_base,
    format_context_for_llm=True,
)

# Create LLM node that uses both the query and retrieved documents
llm_node = LlmNode(
    system="You are a helpful assistant that answers questions based on the provided context documents.",
    prompt=f"Query: {input_node.text}\n\nContext: {knowledge_base_node.formatted_text}",
    provider="openai",
    model="gpt-4o-mini",
    temperature=0.7
)

# Create output node for the LLM response
output_node = OutputNode(
    node_name="Response",
    value=llm_node.response
)

# Create the RAG pipeline
rag_pipeline = Pipeline.new(
    name="rag-pipeline",
    nodes=[input_node, knowledge_base_node, llm_node, output_node],
)

Batch Mode (List Processing)

Use pipeline.add_batch to run a node in batch mode. The node will process lists of inputs in parallel.

from vectorshift.pipeline import Pipeline

pipeline = Pipeline.new(name="batch-llm-pipeline")

inp = pipeline.add(name="input_0").input(input_type="vec<string>")

llm = pipeline.add_batch(name="batch_llm").llm(
    provider="openai",
    model="gpt-4o-mini",
    prompt=inp.value,
    system="You are concise. Reply in one sentence.",
    stream=False
)

pipeline.save()

result = pipeline.run(inputs={
    "input_0": [
        "Say hello to New York",
        "Say hello to Tokyo",
        "Say hello to Lagos",
    ]
})

You can also set batch mode explicitly on any node:

from vectorshift.pipeline import InputNode, PipelineNode, SplitTextNode, OutputNode, Pipeline

sub_pipeline = Pipeline.fetch(name="your sub pipeline")

input_node = InputNode(node_name="input_0")

split_text_node = SplitTextNode(
    node_name="split_text_node",
    text=input_node.text,
    delimiter="newline"
)

pipeline_node = PipelineNode(
    pipeline_id=sub_pipeline.id,
    node_name="sub_pipeline",
    input_0=split_text_node.processed_text,
    execution_mode="batch"
)

output_node = OutputNode(node_name="output_0", value=pipeline_node.output_0)

main_pipeline = Pipeline.new(
    name="batched-pipeline",
    nodes=[input_node, split_text_node, pipeline_node, output_node]
)

Streaming

import vectorshift
from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode

# Set API key
vectorshift.api_key = 'your api key here'

# Create input node
input_node = InputNode(node_name="input_0")

# Create LLM node that will stream responses
llm_node = LlmNode(
    node_name="llm_node",
    system="You are a helpful assistant.",
    prompt=input_node.text,
    provider="openai", 
    model="gpt-4o-mini",
    temperature=0.7,
    stream=True  # Enable streaming
)

# Create output node connected to LLM response
output_node = OutputNode(
    node_name="output_0",
    value=llm_node.response,
    output_type="stream<string>"
)

# Create and save the pipeline
pipeline = Pipeline.new(
    name="streaming-llm-pipeline-1",
    nodes=[input_node, llm_node, output_node]
)

# Run pipeline with streaming enabled
input_data = {"input_0": "Tell me a story about a brave adventurer"}

# Stream the response chunks
for chunk in pipeline.run(input_data, stream=True):
    try:
        # Parse the chunk as a JSON line
        chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
        if chunk_str.startswith('data: '):
            json_str = chunk_str[6:]  # Remove 'data: ' prefix
            import json
            data = json.loads(json_str)
            if data.get('output_name') == 'output_0':
                print(data.get('output_value', ''), end="", flush=True)
    except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
        # If parsing fails, just continue to next chunk
        continue

Async Usage

Call the async sdk methods by prefixing the sdk method with a. Here we can fetch a pipeline by name, run it with a particular input and await the pipeline results.

import asyncio
import vectorshift

from vectorshift.pipeline import Pipeline, InputNode, OutputNode, LlmNode


vectorshift.api_key = "your api key here"


pipeline = Pipeline.fetch(name="your pipeline name here")

input_data = {"input_0": "Hello, how are you?"}

result  = asyncio.run(pipeline.arun(input_data))
print(result)

Parallel Knowledge Base Upload

We can use the async methods to parallelize bulk upload of the files in a directory to a knowledge base. Here we have a script that takes in a vectorstore name and a local diretory to upload.

import asyncio
import os
import argparse
import vectorshift
from vectorshift.knowledge_base import KnowledgeBase, IndexingConfig
from dotenv import load_dotenv
from tqdm import tqdm

load_dotenv()

def upload_documents(vectorstore_name, upload_dir, max_concurrent=16):
    vectorshift.api_key = 'your api key here'
    vectorstore = KnowledgeBase.fetch(name=vectorstore_name)

    num_files = sum([len(files) for r, d, files in os.walk(upload_dir)])
    print(f'Number of files in the upload directory: {num_files}')
    
    async def upload_document(semaphore, script_path, document_title, dirpath):
        async with semaphore:
            try:
                # Create indexing configuration
                indexing_config = IndexingConfig(
                    chunk_size=512,
                    chunk_overlap=0,
                    file_processing_implementation='Default',
                    index_tables=False,
                    analyze_documents=False
                )
                response = await vectorstore.aindex_document(
                    document_type='file',
                    document=script_path,
                    indexing_config=indexing_config
                )
                return f"Response for {document_title} in directory {dirpath}: {response}"
            except Exception as e:
                return f"Response for {document_title} in directory {dirpath}: Failed due to {e}"

    async def upload_all_documents():
        # Create semaphore to limit concurrent uploads
        semaphore = asyncio.Semaphore(max_concurrent)
        
        all_files = []
        for dirpath, dirnames, filenames in os.walk(upload_dir):
            for script_file in filenames:
                script_path = os.path.join(dirpath, script_file)
                document_title = os.path.basename(script_path)
                all_files.append((script_path, document_title, dirpath))
        
        # Create tasks for all files
        tasks = []
        for script_path, document_title, dirpath in all_files:
            task = upload_document(semaphore, script_path, document_title, dirpath)
            tasks.append(task)
        
        # Process all tasks with progress bar
        with tqdm(total=len(all_files), desc="Uploading documents") as pbar:
            for coro in asyncio.as_completed(tasks):
                result = await coro
                if "Failed due to" in result:
                    print(f"Error: {result}")
                else:
                    print(result)
                pbar.update(1)
    
    asyncio.run(upload_all_documents())

if __name__ == "__main__":
    # Setup command line argument parsing
    parser = argparse.ArgumentParser(description='Upload documents to a VectorStore.')
    parser.add_argument('--vectorstore_name', type=str, required=True, help='Name of the VectorStore to upload documents to.')
    parser.add_argument('--upload_dir', type=str, required=True, help='Directory path of documents to upload.')
    parser.add_argument('--max_concurrent', type=int, default=16, help='Maximum number of concurrent uploads.')
    args = parser.parse_args()

    upload_documents(args.vectorstore_name, args.upload_dir, args.max_concurrent)

Pipeline Management

Fetch by Name or ID

# Fetch by name
pipeline = Pipeline.fetch(name="My Pipeline")

# Fetch by ID
pipeline = Pipeline.fetch(id="pipeline-id-123")

# Fetch a specific version or branch
pipeline = Pipeline.fetch(id="pipeline-id-123", version="1.2.0")
pipeline = Pipeline.fetch(id="pipeline-id-123", branch_id="branch-id")

# Fetch from another user/org
pipeline = Pipeline.fetch(name="Shared Pipeline", username="other_user", org_name="my-org")

List Pipelines

pipelines = Pipeline.list()

# With filtering and pagination
pipelines = Pipeline.list(folder_id="folder-123", include_shared=True, offset=0, limit=20)

Duplicate a Pipeline

new_pipeline = existing_pipeline.duplicate(name="My Pipeline Copy")

Version Control

The SDK supports version control with semantic versioning and deploy controls via pipeline.save().

from vectorshift.pipeline import Pipeline, BumpLevel

pipeline = Pipeline.fetch(name="basic-llm-pipeline")

# Save and deploy (deploy=True is the default)
pipeline.save()

# Save with a version bump
pipeline.save(bump=BumpLevel.MINOR, description="Updated model to gpt-4o")

# Save without deploying
pipeline.save(deploy=False)

BumpLevel supports PATCH, MINOR, and MAJOR semantic version bumps.

To update a pipeline's nodes, modify them in code and save:

from vectorshift.pipeline import Pipeline

pipeline = Pipeline.fetch(name="basic-llm-pipeline")
pipeline.nodes.clear()

inp = pipeline.add(name="input_0").input(input_type="string")

llm = pipeline.add(name="llm_node").llm(
    system="You are a helpful assistant.",
    prompt=inp.text,
    provider="openai",
    model="gpt-4o",
    temperature=0.7
)

out = pipeline.add(name="output_0").output(value=llm.response)

pipeline.save(bump=BumpLevel.MINOR, description="Upgraded model to gpt-4o")

Conditional Logic

Use ConditionNode to add branching logic to your pipelines.

from vectorshift.pipeline import (
    Pipeline, InputNode, OutputNode, TextNode, MergeNode,
    ConditionNode, Operator, LogicalOp, Clause, ConditionGroup,
)

score_input = InputNode(node_name="score", input_type="int32")

condition = ConditionNode(
    node_name="grade_check",
    conditions=[
        ConditionGroup(
            [Clause(Operator.GREATER_THAN_EQUAL, score_input.value, 90)],
        ),
        ConditionGroup(
            [Clause(Operator.GREATER_THAN_EQUAL, score_input.value, 50)],
        ),
    ],
)

# Each condition group creates a path output (path_0, path_1, ..., path_else)
msg_excellent = TextNode(node_name="msg_a", text="Excellent!", dependencies=[condition.path_0])
msg_passing = TextNode(node_name="msg_b", text="You passed.", dependencies=[condition.path_1])
msg_fail = TextNode(node_name="msg_c", text="Did not pass.", dependencies=[condition.path_else])

merge = MergeNode(
    node_name="merge",
    function="first",
    type="string",
    fields=[msg_excellent.text, msg_passing.text, msg_fail.text],
)

output = OutputNode(node_name="result", output_type="string", value=merge.output)

pipeline = Pipeline.new(
    name="conditional-pipeline",
    nodes=[score_input, condition, msg_excellent, msg_passing, msg_fail, merge, output],
)

Conditions support 20+ operators including comparisons (EQUAL, GREATER_THAN, etc.), text checks (TEXT_CONTAINS, TEXT_STARTS_WITH, etc.), and unary checks (IS_EMPTY, IS_TRUE, etc.). Multiple clauses can be combined with LogicalOp.AND / LogicalOp.OR.

Chatbots

Run a chatbot. This code allows you to chat with your chatbot in your terminal. Since we provide conversation_id = None in the initial run the chatbot will start a new conversation. Note how by entering the conversation id returned by chatbot.run we can continue the conversation and have the chatbot see previous repsponses.

from vectorshift import Chatbot
chatbot = Chatbot.fetch(name = 'your chatbot name')

conversation_id = None
while True:
    user_input = input("User: ")
    if user_input.lower() == "quit":
        break
    response = chatbot.run(input=user_input, input_type="text", conversation_id=conversation_id)
    conversation_id = response['conversation_id']
    print(response['output_message'])

Streaming Chatbot

from vectorshift import Chatbot
chatbot = Chatbot.fetch(name = 'your chatbot name')

conversation_id = None
while True:
    user_input = input("User: ")
    if user_input.lower() == "quit":
        break
    response_stream = chatbot.run(input=user_input, input_type="text", conversation_id=conversation_id, stream=True)
    conversation_id = None
    for chunk in response_stream:
        try:
            chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
            if chunk_str.startswith('data: '):
                json_str = chunk_str[6:]  # Remove 'data: ' prefix
                import json
                data = json.loads(json_str)
                if data.get('conversation_id'):
                    conversation_id = data.get('conversation_id')
                elif data.get('output_value') and data.get('type') == 'stream':
                    print(data.get('output_value', ''), end="", flush=True)
        except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
            continue
    print()  # Add newline after streaming is complete

Chatbot File Upload

from vectorshift import Chatbot
import os
import json
chatbot = Chatbot.fetch(name='your chatbot name')

conversation_id = None
while True:
    user_input = input("User: ")
    if user_input.lower() == "quit":
        break
    
    # Handle file upload
    if user_input.startswith("add_file "):
        file_path = user_input[9:]  # Remove "add_file " prefix
        if os.path.isfile(file_path):
            try:
                upload_response = chatbot.upload_files(file_paths=[file_path], conversation_id=conversation_id)
                conversation_id = upload_response.get('conversation_id')
                print(f"File uploaded successfully: {upload_response.get('uploaded_files', [])}")
            except Exception as e:
                print(f"Error uploading file: {e}")
        else:
            print(f"File not found: {file_path}")
        continue
    
    # Handle text input with streaming
    response_stream = chatbot.run(input=user_input, input_type="text", conversation_id=conversation_id, stream=True)
    
    for chunk in response_stream:
        try:
            chunk_str = chunk.decode('utf-8') if isinstance(chunk, bytes) else str(chunk)
            if not chunk_str.startswith('data: '):
                continue
                
            data = json.loads(chunk_str[6:])  # Remove 'data: ' prefix
            
            # Update conversation_id if present
            if data.get('conversation_id'):
                conversation_id = data.get('conversation_id')
            
            # Print streaming output
            if data.get('output_value') and data.get('type') == 'stream':
                print(data.get('output_value', ''), end="", flush=True)
                
        except (json.JSONDecodeError, UnicodeDecodeError, AttributeError):
            continue
    
    print()  # Add newline after streaming

Integrations

Integration nodes accept an integration object that include the id of your integration

from vectorshift.pipeline import IntegrationSlackNode, InputNode, Pipeline
from vectorshift.integrations import Integration
input_node = InputNode(node_name="input_0", description = 'Gmail Message to Send')

integration_id = 'your integration id'
integration = Integration(object_id = integration_id)
gmail_node = IntegrationGmailNode(
    integration = integration,
    node_name="gmail_node",
    action="send_email",
    recipients="recipient@gmail.com",
    subject="Test Email from Pipeline",
    body=input_node.text,
    format="text"
)

gmail_pipeline = Pipeline.new(
    name="gmail-pipeline",
    nodes=[input_node, gmail_node]
)

To use the slack node specify the channel and team id accessible from the slack app

from vectorshift.pipeline import IntegrationSlackNode, InputNode, Pipeline
from vectorshift.integrations import Integration
input_node = InputNode(node_name="input_0", description = 'Slack Message to Send')


integration_id = 'your_integration_id'
slack_node = IntegrationSlackNode(
    node_name="slack_node", 
    integration = Integration(object_id = integration_id),
    action = 'send_message',
    channel = 'your_channel_id',
    message = input_node.text,
    team = 'your_team_id'
)

slack_pipeline = Pipeline.new(
    name = 'slack-pipeline',
    nodes = [input_node, slack_node]
)

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vectorshift-0.1.1.tar.gz (1.4 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

vectorshift-0.1.1-py3-none-any.whl (1.8 MB view details)

Uploaded Python 3

File details

Details for the file vectorshift-0.1.1.tar.gz.

File metadata

  • Download URL: vectorshift-0.1.1.tar.gz
  • Upload date:
  • Size: 1.4 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for vectorshift-0.1.1.tar.gz
Algorithm Hash digest
SHA256 df0a000987255dc91d488cb39465294833178184c58c4996ea92242f521b5154
MD5 4218a11059fa105a771b4f008ecaea7a
BLAKE2b-256 131e0def420f1398c8cdc094b4ab44ed686d918dfaff7308c77e5f37ea662d9d

See more details on using hashes here.

File details

Details for the file vectorshift-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: vectorshift-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 1.8 MB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for vectorshift-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 96342a21e484e34f8e343a581c3d19c2f002b85f5fda37470a7986a3261ce081
MD5 1b4b818ac11563ae07a7ec6512ef84f1
BLAKE2b-256 bab381ff6d3612f730b57c5f7c0763d2692f075518ff0fc4d52266cbde713eac

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page