Skip to main content

No project description provided

Project description

pyvector-rs

An experiment to integrate the power of Vector with Python!

Sending messages reliably can be quite hard

Even with something simple like SQS, when sending a batch of messages individual messages can fail while the rest succeed. So you need to detect this (and other errors), keep them in memory, and retry them with some kind of backoff. But what if your process is asked to exist before these have been sent successfully? What do you do? And how do you handle a large spike in send failures? You don't want messages to pile up and exhaust your memory, which would result in you loosing all your messages. So you need some kind of disk buffer. And you need metrics around this, and logging, and the rest.

If you squint a bit, this begins to look a lot like Vector.

What does this do?

This library integrates Vector with Python (without using an external process) and provides a custom python source that allows you to send Python bytes to Vector with minimal copying.

You can then use any of the many available sinks to forward this data anywhere, with Vector handling all the complexities around batching, buffering to disk or memory, retries, rate-limiting, partitioning, authentication, backpressure and more.

The code below sends 1 million events to a SQS queue, a S3 bucket and an Elasticsearch cluster:

import uuid
import pyvector
import asyncio
import json

# Vector config: https://vector.dev/docs/reference/configuration/
config = """
[sources.python]
type = "python"

[sinks.s3]
type = "aws_s3"
inputs = ["python"]
bucket = "my-bucket"

[sinks.sqs]
type = "aws_sqs"
inputs = ["python"]
queue_url = "..."

[sinks.elasticsearch]
type = "elasticsearch"
inputs = ["python"]
endpoints = ["..."]
"""

async def send_to_vector():
    vector = pyvector.Vector(config)
    await vector.start()

    for i in range(1_000_000):
        data = json.dumps({"i": i, "uuid": str(uuid.uuid4())}).encode()
        await vector.send(source="python", data=data)
    
    await vector.stop()

asyncio.run(send_to_vector())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyvector_rs-0.1.0.tar.gz (55.2 kB view hashes)

Uploaded Source

Built Distributions

pyvector_rs-0.1.0-cp312-cp312-manylinux_2_28_x86_64.whl (58.5 MB view hashes)

Uploaded CPython 3.12 manylinux: glibc 2.28+ x86-64

pyvector_rs-0.1.0-cp312-cp312-macosx_11_0_arm64.whl (18.3 MB view hashes)

Uploaded CPython 3.12 macOS 11.0+ ARM64

pyvector_rs-0.1.0-cp311-cp311-manylinux_2_28_x86_64.whl (58.5 MB view hashes)

Uploaded CPython 3.11 manylinux: glibc 2.28+ x86-64

pyvector_rs-0.1.0-cp311-cp311-macosx_11_0_arm64.whl (18.3 MB view hashes)

Uploaded CPython 3.11 macOS 11.0+ ARM64

pyvector_rs-0.1.0-cp310-cp310-manylinux_2_28_x86_64.whl (58.5 MB view hashes)

Uploaded CPython 3.10 manylinux: glibc 2.28+ x86-64

pyvector_rs-0.1.0-cp310-cp310-macosx_11_0_arm64.whl (18.3 MB view hashes)

Uploaded CPython 3.10 macOS 11.0+ ARM64

pyvector_rs-0.1.0-cp39-cp39-manylinux_2_28_x86_64.whl (58.5 MB view hashes)

Uploaded CPython 3.9 manylinux: glibc 2.28+ x86-64

pyvector_rs-0.1.0-cp39-cp39-macosx_11_0_arm64.whl (18.3 MB view hashes)

Uploaded CPython 3.9 macOS 11.0+ ARM64

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page