Skip to main content

Python library for storing vector data in Postgres

Project description

Timescale Vector

PostgreSQL++ for AI Applications.

  • Signup for Timescale Vector: Get 90 days free to try Timescale Vector on the Timescale cloud data platform. There is no self-managed version at this time.
  • Documentation: Learn the key features of Timescale Vector and how to use them.
  • Getting Started Tutorial: Learn how to use Timescale Vector for semantic search on a real world dataset.
  • Learn more: Learn more about Timescale Vector, how it works and why we built it.

If you prefer to use an LLM development or data framework, see Timescale Vector’s integrations with LangChain and LlamaIndex

Install

pip install timescale_vector

Basic Usage

Load up your postgres credentials. Safest way is with a .env file:

from dotenv import load_dotenv, find_dotenv
import os
_ = load_dotenv(find_dotenv(), override=True) 
service_url  = os.environ['TIMESCALE_SERVICE_URL']

Next, create the client.

This takes three arguments:

  • A connection string

  • The name of the collection

  • Number of dimensions

    In this tutorial, we will use the async client. But we have a sync client as well (with an almost identical interface)

from timescale_vector import client
vec  = client.Async(service_url, "my_data", 2)

Next, create the tables for the collection:

await vec.create_tables()

Next, insert some data. The data record contains:

  • A uuid to uniquely identify the emedding
  • A json blob of metadata about the embedding
  • The text the embedding represents
  • The embedding itself

Because this data already includes uuids we only allow upserts

import uuid
await vec.upsert([\
    (uuid.uuid4(), '''{"animal":"fox"}''', "the brown fox", [1.0,1.3]),\
    (uuid.uuid4(), '''{"animal":"fox", "action":"jump"}''', "jumped over the", [1.0,10.8]),\
])

Now you can query for similar items:

await vec.search([1.0, 9.0])
[<Record id=UUID('d10dc66f-92d5-4296-a702-1690860bbe55') metadata={'action': 'jump', 'animal': 'fox'} contents='jumped over the' embedding=array([ 1. , 10.8], dtype=float32) distance=0.00016793422934946456>,
 <Record id=UUID('06153343-9085-4844-ad7a-b5cbed912053') metadata={'animal': 'fox'} contents='the brown fox' embedding=array([1. , 1.3], dtype=float32) distance=0.14489260377438218>]

You can specify the number of records to return.

await vec.search([1.0, 9.0], limit=1)
[<Record id=UUID('d10dc66f-92d5-4296-a702-1690860bbe55') metadata={'action': 'jump', 'animal': 'fox'} contents='jumped over the' embedding=array([ 1. , 10.8], dtype=float32) distance=0.00016793422934946456>]

You can also specify a filter on the metadata as a simple dictionary

await vec.search([1.0, 9.0], limit=1, filter={"action": "jump"})
[<Record id=UUID('d10dc66f-92d5-4296-a702-1690860bbe55') metadata={'action': 'jump', 'animal': 'fox'} contents='jumped over the' embedding=array([ 1. , 10.8], dtype=float32) distance=0.00016793422934946456>]

You can also specify a list of filter dictionaries, where an item is returned if it matches any dict

await vec.search([1.0, 9.0], limit=2, filter=[{"action": "jump"}, {"animal": "fox"}])
[<Record id=UUID('d10dc66f-92d5-4296-a702-1690860bbe55') metadata={'action': 'jump', 'animal': 'fox'} contents='jumped over the' embedding=array([ 1. , 10.8], dtype=float32) distance=0.00016793422934946456>,
 <Record id=UUID('06153343-9085-4844-ad7a-b5cbed912053') metadata={'animal': 'fox'} contents='the brown fox' embedding=array([1. , 1.3], dtype=float32) distance=0.14489260377438218>]

You can access the fields as follows

records = await vec.search([1.0, 9.0], limit=1, filter={"action": "jump"})
records[0][client.SEARCH_RESULT_ID_IDX]
UUID('d10dc66f-92d5-4296-a702-1690860bbe55')
records[0][client.SEARCH_RESULT_METADATA_IDX]
{'action': 'jump', 'animal': 'fox'}
records[0][client.SEARCH_RESULT_CONTENTS_IDX]
'jumped over the'
records[0][client.SEARCH_RESULT_EMBEDDING_IDX]
array([ 1. , 10.8], dtype=float32)
records[0][client.SEARCH_RESULT_DISTANCE_IDX]
0.00016793422934946456

You can delete by ID:

await vec.delete_by_ids([records[0][client.SEARCH_RESULT_ID_IDX]])
[]

Or you can delete by metadata filters:

await vec.delete_by_metadata({"action": "jump"})
[]

To delete all records use:

await vec.delete_all()

Advanced Usage

Indexing

Indexing speeds up queries over your data.

By default, we setup indexes to query your data by the uuid and the metadata.

If you have many rows, you also need to setup an index on the embedding. You can create a timescale-vector index on the table with.

await vec.create_embedding_index(client.TimescaleVectorIndex())

Please see TimescaleVectorIndex documentation for advanced options. the You can drop the index with:

await vec.drop_embedding_index()

While we recommend the timescale-vector index type, we also have 2 more index types availabe:

Usage examples below:

await vec.create_embedding_index(client.IvfflatIndex())
await vec.drop_embedding_index()
await vec.create_embedding_index(client.HNSWIndex())
await vec.drop_embedding_index()

Please note it is very important create the ivfflat index only after you have data in the table.

Please note the community is actively working on new indexing methods for embeddings. As they become available, we will add them to our client as well.

Time-partitioning

In many use-cases where you have many embeddings time is an important component associated with the embeddings. For example, when embedding news stories you often search by time as well as similarity (e.g. stories related to bitcoin in the past week, or stories about Clinton in November 2016).

Yet, traditionally, searching by two components “similarity” and “time” is challenging approximate nearest neigbor (ANN) indexes and makes the similariy-search index less effective.

One approach to solving this is partitioning the data by time and creating ANN indexes on each partition individually. Then, during search you can:

  • Step 1: filter our partitions that don’t match the time predicate
  • Step 2: perform the similarity search on all matching partitions
  • Step 3: combine all the results from each partition in step 2, rerank, and filter out results by time.

Step 1 makes the search a lot more effecient by filtering out whole swaths of data in one go.

Timescale-vector supports time partitioning using TimescaleDB’s hypertables. To use this feature, simply indicate the length in time for each partition when creating the client:

from datetime import timedelta
from datetime import datetime
vec = client.Async(service_url, "my_data_with_time_partition", 2, time_partition_interval=timedelta(hours=6))
await vec.create_tables()

Then insert data where the ids use uuid’s v1 and the time component of the uuid specifies the time of the embedding. For example, to create an embedding for the current time simply do:

id = uuid.uuid1()
await vec.upsert([(id, {"key": "val"}, "the brown fox", [1.0, 1.2])])

To insert data for a specific time in the past, create the uuid using our uuid_from_time function

specific_datetime = datetime(2018, 8, 10, 15, 30, 0)
await vec.upsert([(client.uuid_from_time(specific_datetime), {"key": "val"}, "the brown fox", [1.0, 1.2])])

You can then query the data by specifing a uuid_time_filter in the search call:

rec = await vec.search([1.0, 2.0], limit=4, uuid_time_filter=client.UUIDTimeRange(specific_datetime-timedelta(days=7), specific_datetime+timedelta(days=7)))

PgVectorize

PgVectorize enables you to create vector embeddings from any data that you already have stored in Postgres. Simply, attach PgVectorize to any Postgres table, and it will automatically sync that table’s data with a set of embeddings stored in Timescale Vector. For example, let’s say you have a blog table defined in the following way:

import psycopg2
from langchain.docstore.document import Document
from langchain.text_splitter import CharacterTextSplitter
from timescale_vector import client, pgvectorizer
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores.timescalevector import TimescaleVector
from datetime import timedelta
with psycopg2.connect(service_url) as conn:
    with conn.cursor() as cursor:
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS blog (
            id              SERIAL PRIMARY KEY NOT NULL,
            title           TEXT NOT NULL,
            author          TEXT NOT NULL,
            contents        TEXT NOT NULL,
            category        TEXT NOT NULL,
            published_time  TIMESTAMPTZ NULL --NULL if not yet published
        );
        ''')

You can insert some data as follows:

with psycopg2.connect(service_url) as conn:
    with conn.cursor() as cursor:
        cursor.execute('''
            INSERT INTO blog (title, author, contents, category, published_time) VALUES ('First Post', 'Matvey Arye', 'some super interesting content about cats.', 'AI', '2021-01-01');
        ''')

Now, say you want to embed these blogs in Timescale Vector. First you need to define an embed_and_write function, that takes a set of blog posts, creates the embeddings, and writes them into TimescaleVector. For example, if using LangChain, it could look something like the following.

def get_document(blog):
    text_splitter = CharacterTextSplitter(
        chunk_size=1000,
        chunk_overlap=200,
    )
    docs = []
    for chunk in text_splitter.split_text(blog['contents']):
        content = f"Author {blog['author']}, title: {blog['title']}, contents:{chunk}"
        metadata = {
            "id": str(client.uuid_from_time(blog['published_time'])),
            "blog_id": blog['id'], 
            "author": blog['author'], 
            "category": blog['category'],
            "published_time": blog['published_time'].isoformat(),
        }
        docs.append(Document(page_content=content, metadata=metadata))
    return docs

def embed_and_write(blog_instances, vectorizer):
    embedding = OpenAIEmbeddings()
    vector_store = TimescaleVector(
        collection_name="blog_embedding",
        service_url=service_url,
        embedding=embedding,
        time_partition_interval=timedelta(days=30),
    )

    # delete old embeddings for all ids in the work queue. locked_id is a special column that is set to the primary key of the table being
    # embedded. For items that are deleted, it is the only key that is set.
    metadata_for_delete = [{"blog_id": blog['locked_id']} for blog in blog_instances]
    vector_store.delete_by_metadata(metadata_for_delete)

    documents = []
    for blog in blog_instances:
        # skip blogs that are not published yet, or are deleted (in which case it will be NULL)
        if blog['published_time'] != None:
            documents.extend(get_document(blog))

    if len(documents) == 0:
        return
    
    texts = [d.page_content for d in documents]
    metadatas = [d.metadata for d in documents]
    ids = [d.metadata["id"] for d in documents]
    vector_store.add_texts(texts, metadatas, ids)

Then, all you have to do is run the following code in a scheduled job (cron job, lambda job, etc):

vectorizer = pgvectorizer.Vectorize(service_url, 'blog')
while vectorizer.process(embed_and_write) > 0:
    pass

Every time that job runs it will sync the table with your embeddings. It will sync all insert, updates, and deletes to an embeddings table called blog_embedding.

Now, you can simply search the embeddings follows (again, using LangChain in the exampls):

embedding = OpenAIEmbeddings()
vector_store = TimescaleVector(
    collection_name="blog_embedding",
    service_url=service_url,
    embedding=embedding,
    time_partition_interval=timedelta(days=30),
)

res = vector_store.similarity_search_with_score("Blogs about cats")
res
[(Document(page_content='Author Matvey Arye, title: First Post, contents:some super interesting content about cats.', metadata={'id': '4a784000-4bc4-11eb-9140-78a539e57b40', 'author': 'Matvey Arye', 'blog_id': 1, 'category': 'AI', 'published_time': '2021-01-01T00:00:00+00:00'}),
  0.12605134378941762)]

Development

This project is developed with nbdev. Please see that website for the development process.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

timescale-vector-0.0.3.tar.gz (28.6 kB view hashes)

Uploaded Source

Built Distribution

timescale_vector-0.0.3-py3-none-any.whl (24.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page