Python client library for Opsqueue, the lightweight batch processing queue for heavy loads
Project description
The Python client library for the Opsqueue lightweight batch processing queue system.
Find the full README with examples at https://github.com/channable/opsqueue
Getting Started:
1. Grab the opsqueue binary and the Python client library
- Install the Opsqueue binary, using
cargo install opsqueue(if you do not have Cargo/Rust installed yet, follow the instructions at https://rustup.rs/ first) - Install the Python client using
pip install opsqueue,uv install opsqueueor similar.
2. Create a Producer
import logging
from opsqueue.producer import ProducerClient
from collections.abc import Iterable
logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.DEBUG)
def file_to_words(filename: str) -> Iterable[str]:
"""
Iterates over each word and inter-word whitespace strings in a file
while keeping at most one line in memory at a time.
"""
with open(filename) as input_file:
for line in input_file:
for word in line.split():
yield word
def print_words(words: Iterable[str]) -> None:
"""
Prints all words and inter-word whitespace tokens
without first loading the full string into memory
"""
for word in words:
print(word, end="")
def main() -> None:
client = ProducerClient("localhost:3999", "file:///tmp/opsqueue/capitalize_text/")
stream_of_words = file_to_words("lipsum.txt")
stream_of_capitalized_words = client.run_submission(stream_of_words, chunk_size=4000)
print_words(stream_of_capitalized_words)
if __name__ == "__main__":
main()
3. Create a Consumer
import logging
from opsqueue.consumer import ConsumerClient, Strategy
logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)
def capitalize_word(word: str) -> str:
output = word.capitalize()
# print(f"Capitalized word: {word} -> {output}")
return output
def main() -> None:
client = ConsumerClient("localhost:3999", "file:///tmp/opsqueue/capitalize_text/")
client.run_each_op(capitalize_word, strategy=Strategy.Random())
if __name__ == "__main__":
main()
- Run the Producer, queue and Consumer
- Run
opsqueue. - Run
python3 capitalize_text_consumer.pyto run a consumer. Feel free to start multiple instances of this program to try out consumer concurrency. - Run
python3 capitalize_text_producer.pyto run a producer.
The order you start these in does not matter; systems will reconnect and continue after any kind of failure or disconnect.
By default the queue will listen on http://localhost:3999. The exact port can of course be changed.
Producer and Consumer need to share the same object store location to store the content of their submission chunks.
In development, this can be a local folder as shown in the code above.
In production, you probably want to use Google's GCS, Amazon's S3 or Microsoft's Azure buckets.
Please tinker with above code!
If you want more logging to look under the hood, run RUST_LOG=debug opsqueue to enable extra logging for the queue.
The Producer/Consumer will use whatever log level is configured in Python.
More examples can be found in ./libs/opsqueue_python/examples/
More Info
Find the full README with examples at https://github.com/channable/opsqueue
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file opsqueue-0.30.3.tar.gz.
File metadata
- Download URL: opsqueue-0.30.3.tar.gz
- Upload date:
- Size: 157.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.8.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3c5bd6aca0053222ba69a3e94cc9801f35e25023995c40fcbbf60d4ddc85e4fa
|
|
| MD5 |
988c16979bcbae77af9ef4dc4183c79f
|
|
| BLAKE2b-256 |
cab5b3995013937052fdb37b4f977920d429ee99228fac708fbdba4014b392b9
|
File details
Details for the file opsqueue-0.30.3-cp312-cp312-manylinux_2_34_x86_64.whl.
File metadata
- Download URL: opsqueue-0.30.3-cp312-cp312-manylinux_2_34_x86_64.whl
- Upload date:
- Size: 3.0 MB
- Tags: CPython 3.12, manylinux: glibc 2.34+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: maturin/1.8.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7ae3818b1912dda62088957b71b3f1727f461539e755c91c0c56960b3a29da30
|
|
| MD5 |
44595541d8bf41ec27c580a92e2ee7af
|
|
| BLAKE2b-256 |
f8ebaea291f102420e72e7d3d46061fb1dff007845344bca93c30f46f7dc2cc2
|