Threading Queue
Project description
tqueue package
This library allow to do you tasks in multiple threads easily.
This is helpful when you have a lot of data to processing.
Asume that you have a large list of item to process. You need to write a producer to put items to queue one by one.
Workers will get data from queue then process it. Putting data to queue should be quicker then processing it.
Installation
pip install tqueue
Usage
- Import library
from tqueue import ThreadingQueue
- Create worker
- Create worker function that get the data as the first parameter
- Worker can be a normal function or a coroutine function
- Worker will be called in child threads
def worker(data):
pass
async def worker2(data):
pass
- Set threading for a producer Apply the threading for a producer:
- a. Set the number of threads and the worker
- b. Put data to queue
async def producer():
# Start the queue
tq = ThreadingQueue(40, worker)
...
tq.put(data)
...
tq.stop()
- You can also use ThreadingQueue like a context manager
async def producer():
# Start the queue
with ThreadingQueue(40, worker) as tq:
...
tq.put(data)
- Run producer
await producer()
or
asyncio.run(producer())
Note
- You can add more keyword params for all workers running in threads via
worker_params
- Apart from number of threads and the worker, you can set
log_dir
to store logs to file - and
worker_params_builder
to generate parameters for each worker. on_thread_close
is an optional param as a function that is helpful when you need to close the database connection when a thread done- Apart from all above params, the rest of keyword params will be pass to the worker.
Example
import json
import pymysql
import asyncio
from tqueue import ThreadingQueue
NUM_OF_THREADS = 40
def get_db_connection():
return pymysql.connect(host='localhost',
user='root',
password='123456',
database='example',
cursorclass=pymysql.cursors.DictCursor)
# Build params for worker, the params will be persistent with thread
def worker_params_builder():
# Threads use db connection separately
conn = get_db_connection()
conn.autocommit(1)
cursor = conn.cursor()
return {"cursor": cursor}
def worker(image_info, cursor, uid: int = 0):
# Update image info into database
sql = "UPDATE images SET width = %s, height = %s, uid = %s WHERE id = %s"
cursor.execute(sql, (image_info["width"], image_info["height"], uid, image_info["id"]))
async def producer(source_file: str):
tq = ThreadingQueue(
NUM_OF_THREADS, worker, log_dir=f"logs/update-images", worker_params_builder=worker_params_builder, params={"uid": 123}
)
with open(source_file, 'r') as f:
for line in f:
if not line:
continue
data = json.loads(line)
await tq.put(data)
tq.stop()
if __name__ == "__main__":
asyncio.run(producer("images.jsonl"))
Development
Build project
- Update the version number in file
src/tqueue/__version__.py
- Update Change log
- Build and publish the changes
python3 -m build
python3 -m twine upload dist/*
Release Information
Added
- Using lib as a context manager
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
tqueue-0.0.10.tar.gz
(6.3 kB
view hashes)