Skip to main content

Python client library for Opsqueue, the lightweight batch processing queue for heavy loads

Project description

The Python client library for the Opsqueue lightweight batch processing queue system.

Find the full README with examples at https://github.com/channable/opsqueue

Getting Started:

1. Grab the opsqueue binary and the Python client library

  1. Install the Opsqueue binary, using cargo install opsqueue (if you do not have Cargo/Rust installed yet, follow the instructions at https://rustup.rs/ first)
  2. Install the Python client using pip install opsqueue, uv install opsqueue or similar.

2. Create a Producer

import logging
from opsqueue.producer import ProducerClient
from collections.abc import Iterable

logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.DEBUG)

def file_to_words(filename: str) -> Iterable[str]:
    """
    Iterates over each word and inter-word whitespace strings in a file
    while keeping at most one line in memory at a time.
    """
    with open(filename) as input_file:
        for line in input_file:
            for word in line.split():
                yield word

def print_words(words: Iterable[str]) -> None:
    """
    Prints all words and inter-word whitespace tokens
    without first loading the full string into memory
    """
    for word in words:
        print(word, end="")

def main() -> None:
    client = ProducerClient("localhost:3999", "file:///tmp/opsqueue/capitalize_text/")
    stream_of_words = file_to_words("lipsum.txt")
    stream_of_capitalized_words = client.run_submission(stream_of_words, chunk_size=4000)
    print_words(stream_of_capitalized_words)

if __name__ == "__main__":
    main()

3. Create a Consumer

import logging
from opsqueue.consumer import ConsumerClient, Strategy

logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)

def capitalize_word(word: str) -> str:
    output = word.capitalize()
    # print(f"Capitalized word: {word} -> {output}")
    return output

def main() -> None:
    client = ConsumerClient("localhost:3999", "file:///tmp/opsqueue/capitalize_text/")
    client.run_each_op(capitalize_word, strategy=Strategy.Random())

if __name__ == "__main__":
    main()
  1. Run the Producer, queue and Consumer
  • Run opsqueue.
  • Run python3 capitalize_text_consumer.py to run a consumer. Feel free to start multiple instances of this program to try out consumer concurrency.
  • Run python3 capitalize_text_producer.py to run a producer.

The order you start these in does not matter; systems will reconnect and continue after any kind of failure or disconnect.

By default the queue will listen on http://localhost:3999. The exact port can of course be changed. Producer and Consumer need to share the same object store location to store the content of their submission chunks. In development, this can be a local folder as shown in the code above. In production, you probably want to use Google's GCS, Amazon's S3 or Microsoft's Azure buckets.

Please tinker with above code! If you want more logging to look under the hood, run RUST_LOG=debug opsqueue to enable extra logging for the queue. The Producer/Consumer will use whatever log level is configured in Python.

More examples can be found in ./libs/opsqueue_python/examples/

More Info

Find the full README with examples at https://github.com/channable/opsqueue

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

opsqueue-0.30.3.tar.gz (157.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

opsqueue-0.30.3-cp312-cp312-manylinux_2_34_x86_64.whl (3.0 MB view details)

Uploaded CPython 3.12manylinux: glibc 2.34+ x86-64

File details

Details for the file opsqueue-0.30.3.tar.gz.

File metadata

  • Download URL: opsqueue-0.30.3.tar.gz
  • Upload date:
  • Size: 157.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: maturin/1.8.2

File hashes

Hashes for opsqueue-0.30.3.tar.gz
Algorithm Hash digest
SHA256 3c5bd6aca0053222ba69a3e94cc9801f35e25023995c40fcbbf60d4ddc85e4fa
MD5 988c16979bcbae77af9ef4dc4183c79f
BLAKE2b-256 cab5b3995013937052fdb37b4f977920d429ee99228fac708fbdba4014b392b9

See more details on using hashes here.

File details

Details for the file opsqueue-0.30.3-cp312-cp312-manylinux_2_34_x86_64.whl.

File metadata

File hashes

Hashes for opsqueue-0.30.3-cp312-cp312-manylinux_2_34_x86_64.whl
Algorithm Hash digest
SHA256 7ae3818b1912dda62088957b71b3f1727f461539e755c91c0c56960b3a29da30
MD5 44595541d8bf41ec27c580a92e2ee7af
BLAKE2b-256 f8ebaea291f102420e72e7d3d46061fb1dff007845344bca93c30f46f7dc2cc2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page