PgQueuer is a Python library leveraging PostgreSQL for efficient job queuing.
Project description
🚀 PgQueuer - Building Smoother Workflows One Queue at a Time 🚀
📚 Documentation: Explore the Docs 📖
🔍 Source Code: View on GitHub 💾
PgQueuer
PgQueuer is a minimalist, high-performance job queue library for Python, leveraging the robustness of PostgreSQL. Designed for simplicity and efficiency, PgQueuer uses PostgreSQL's LISTEN/NOTIFY to manage job queues effortlessly.
Features
- Simple Integration: Easy to integrate with existing Python applications using PostgreSQL.
- Efficient Concurrency Handling: Utilizes PostgreSQL's
FOR UPDATE SKIP LOCKED
for reliable and concurrent job processing. - Real-time Notifications: Leverages
LISTEN
andNOTIFY
for real-time updates on job status changes.
Installation
To install PgQueuer, simply install with pip the following command:
pip install PgQueuer
Example Usage
Here's how you can use PgQueuer in a typical scenario processing incoming data messages:
import asyncio
import asyncpg
from PgQueuer.models import Job
from PgQueuer.qm import QueueManager
async def main() -> None:
pool = await asyncpg.create_pool(min_size=2)
qm = QueueManager(pool)
N = 1_000
# Enqueue messages.
for n in range(N):
await qm.queries.enqueue("fetch", f"this is from me: {n}".encode())
@qm.entrypoint("fetch")
async def process_message(job: Job) -> None:
print(f"Processed message: {job}")
await qm.run()
if __name__ == "__main__":
asyncio.run(main())
Database Configuration
PgQueuer provides a command-line interface for easy management of installation and uninstallation. Ensure you have configured your environment variables or use the appropriate flags to specify your database credentials.
-
Installing PgQueuer Database Components:
python -m PgQueuer install
-
Uninstalling PgQueuer Database Components:
python -m PgQueuer uninstall
The CLI supports several flags to customize the connection settings. Use --help
to see all available options.
Dashboard Command
The dashboard command provides a real-time view of job processing statistics, which can be refreshed at a specified interval. This is particularly useful for monitoring the status of jobs dynamically. Below are the options available for customizing the dashboard display:
--interval <seconds>
: Set the refresh interval in seconds for updating the dashboard display. If not set, the dashboard will update only once and then exit.--tail <number>
: Specify the number of the most recent log entries to display.--table-format <format>
: Choose the format of the table used to display statistics. Supported formats include grid, plain, html, and others provided by the tabulate library.
Example command to launch the dashboard:
python -m PgQueuer dashboard --interval 10 --tail 25 --table-format grid
Example output from the dashboard:
+---------------------------+-------+------------+--------------------------+------------+----------+
| Created | Count | Entrypoint | Time in Queue (HH:MM:SS) | Status | Priority |
+---------------------------+-------+------------+--------------------------+------------+----------+
| 2024-05-05 16:44:26+00:00 | 49 | sync | 0:00:01 | successful | 0 |
| 2024-05-05 16:44:26+00:00 | 82 | async | 0:00:01 | successful | 0 |
| 2024-05-05 16:44:26+00:00 | 1615 | sync | 0:00:00 | successful | 0 |
| 2024-05-05 16:44:26+00:00 | 1586 | async | 0:00:00 | successful | 0 |
| 2024-05-05 16:44:25+00:00 | 198 | sync | 0:00:01 | successful | 0 |
| 2024-05-05 16:44:25+00:00 | 230 | async | 0:00:01 | successful | 0 |
| 2024-05-05 16:44:25+00:00 | 1802 | sync | 0:00:00 | successful | 0 |
| 2024-05-05 16:44:25+00:00 | 1778 | async | 0:00:00 | successful | 0 |
| 2024-05-05 16:44:24+00:00 | 1500 | sync | 0:00:00 | successful | 0 |
| 2024-05-05 16:44:24+00:00 | 1506 | async | 0:00:00 | successful | 0 |
| 2024-05-05 16:44:23+00:00 | 1505 | async | 0:00:00 | successful | 0 |
| 2024-05-05 16:44:23+00:00 | 1500 | sync | 0:00:00 | successful | 0 |
+---------------------------+-------+------------+--------------------------+------------+----------+
Listen Command
The listen
command allows you to monitor PostgreSQL NOTIFY messages in real-time on a specified channel. This feature is particularly useful for debugging and observing the raw event traffic that your application handles.
Usage:
python -m PgQueuer listen
Example output from listen:
Event(channel='ch_pgqueuer', operation='truncate', sent_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 123135, tzinfo=TzInfo(UTC)), table='pgqueuer', received_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 129818, tzinfo=datetime.timezone.utc))
Event(channel='ch_pgqueuer', operation='insert', sent_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 129978, tzinfo=TzInfo(UTC)), table='pgqueuer', received_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 155611, tzinfo=datetime.timezone.utc))
Event(channel='ch_pgqueuer', operation='insert', sent_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 155634, tzinfo=TzInfo(UTC)), table='pgqueuer', received_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 156301, tzinfo=datetime.timezone.utc))
Event(channel='ch_pgqueuer', operation='delete', sent_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 171691, tzinfo=TzInfo(UTC)), table='pgqueuer', received_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 173794, tzinfo=datetime.timezone.utc))
Event(channel='ch_pgqueuer', operation='delete', sent_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 171702, tzinfo=TzInfo(UTC)), table='pgqueuer', received_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 174455, tzinfo=datetime.timezone.utc))
Event(channel='ch_pgqueuer', operation='delete', sent_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 171704, tzinfo=TzInfo(UTC)), table='pgqueuer', received_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 174875, tzinfo=datetime.timezone.utc))
Event(channel='ch_pgqueuer', operation='delete', sent_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 174537, tzinfo=TzInfo(UTC)), table='pgqueuer', received_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 175312, tzinfo=datetime.timezone.utc))
Event(channel='ch_pgqueuer', operation='delete', sent_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 173397, tzinfo=TzInfo(UTC)), table='pgqueuer', received_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 175468, tzinfo=datetime.timezone.utc))
Event(channel='ch_pgqueuer', operation='delete', sent_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 173748, tzinfo=TzInfo(UTC)), table='pgqueuer', received_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 175861, tzinfo=datetime.timezone.utc))
Event(channel='ch_pgqueuer', operation='delete', sent_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 174547, tzinfo=TzInfo(UTC)), table='pgqueuer', received_at=datetime.datetime(2024, 5, 11, 20, 15, 16, 176460, tzinfo=datetime.timezone.utc))
Benchmark Summary
PgQueuer underwent basic benchmark testing to assess its performance across varying job volumes and concurrency levels.
Key Observations:
- Scalability: Performance increases with higher concurrency, demonstrating the library's ability to efficiently manage larger workloads.
- Consistency: PgQueuer maintains consistent throughput across different job counts, ensuring reliable performance.
- Optimal Performance: The highest throughput observed was ~5,200 jobs per second at a concurrency level of 5.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.