Multisync task manager for python
Project description
🍈 Melony
Modern task manager for python with types, async and sync support!
Features
- Automatically asynchronous or synchronous, depending on provided message broker connection
- Simple for users, simple for developers
- Fully checked with
we-make-python-styleguide - Fully typed for users
- Scaled by processes
- Scaled automatically
- Retry policy (cascade or simmilar soon)
- Revocable tasks (pipelines)
- Cron tasks
- Powerful UI with analytics, full task control and alerts
- 100% test coverage
- Great docs
- No AI
To be continue
Quickstart
Initialize melony broker at tasks.py and declare your tasks:
import time
from melony import RedisBroker
from redis import Redis
broker = RedisBroker(redis_connection=Redis(host='localhost', port=6379))
@broker.task(queue='notifications', retries=2, retry_timeout=30)
def example_task(string_param: str) -> str:
time.sleep(5)
return string_param.upper()
Delay your task anywhere:
example_task(string_param='Im string param').delay(countdown=30)
Run your consumer at consumer.py
from tasks import broker
broker.consumer.start_consume(processes=2)
Avaible brokers
- Redis
- RabbitMQ
- Kafka
To be continue
Installation
Using pip:
pip install melony
Using uv:
uv add melony
Documentation
Brokers
First of all you need to choose your broker. At this moment, you are able to use only RedisBroker, but very soon they will be more. Your application can have any numbers of melony brokers. You don't have to choose broker by async/sync parameter or import broker from io/sync packages of this lib, just import broker and create it. Thats works for all melony entities AT ALL! You don't have to think about this. So, let's initialize your selected broker:
from melony import RedisBroker
from redis.asyncio import Redis
from redis import Redis as SyncRedis
# async
async_redis_connection = Redis(host='localhost', port=6379, db=0)
broker = RedisBroker(redis_connection=redis_connection)
# sync
sync_redis_connection = SyncRedis(host='localhost', port=6379, db=0) # Other connection here
broker = RedisBroker(redis_connection=redis_connection)
Also you are able to provide result backend to your broker. At this moment, you are able to use only RedisResultBackend. Result backend save your task results (return values) to selected database.
from melony import RedisBroker, RedisResultBackend
from redis.asyncio import Redis
from redis import Redis as SyncRedis
# async
async_redis_connection = Redis(host='localhost', port=6379, db=0)
result_backend = RedisResultBackend(redis_connection=async_redis_connection)
broker = RedisBroker(redis_connection=redis_connection, result_backend=result_backend)
# sync
sync_redis_connection = SyncRedis(host='localhost', port=6379, db=0)
result_backend = RedisResultBackend(redis_connection=sync_redis_connection)
broker = RedisBroker(redis_connection=redis_connection)
Task declaration
After your broker initilization, you are able to register task for next delaying and execution using special decorator (broker method) .task(), which can recieved 3 arguments: queue (string 'default' by default), retries (default=1) and retry_timeout (default=0). Your task function must be async if you're using io message broker client and sync if it's not.
# async
import asyncio
from melony import RedisBroker, RedisResultBackend
from redis.asyncio import Redis
redis_connection = Redis(host='localhost', port=6379)
broker = RedisBroker(redis_connection=redis_connection)
@broker.task(retries=2, retry_timeout=30)
async def async_task(string_param: str) -> str:
await asyncio.sleep(2)
return string_param.upper()
# sync
import time
from melony import RedisBroker, RedisResultBackend
from redis import Redis
redis_connection = Redis(host='localhost', port=6379)
broker = RedisBroker(redis_connection=redis_connection)
@broker.task(retries=2, retry_timeout=30)
def sync_task(string_param: str) -> str:
time.sleep(2)
return string_param.upper()
You should remember, that parameter retry_timeout doesn't guarantee that your task will be executed literally after seconds you selected (depends from your messages value in queue). If you need more accuracy, try to configurate your consumer instances (see consumer documentation: processes parameter).
Task delaying
After your tasks declaration, you are able to delay your tasks for next execution by countdown time. For doing this, you need to call your task as usually, then call special TaskWrapper method. This method calls .delay().
# async
await async_task(string_param='I am async task with 15 sec coundown').dalay(countdown=15)
# sync
sync_task(string_param='I am sync task with 30 sec coundown').execute(countdown=30)
Attention: for this moment you are able to delaying tasks only for 24 hours maximum. You will have opportunity to delay your tasks more then 24 hours by postgres/rabbitmq/kafka broker for long life tasks. Now you should use postgres, for example, to save your task list in Celery =(. So, i'm working on it.
Task execution normally
If your function is decorated by @broker.task decorator, you still are able to execute this task usually by special method .execute() instead .delay(). This mehod has no arguments and needed for immidiatly execuiton your function as common python function.
# async
await async_task(string_param='I am async task for immidiatly execuiton').execute()
# sync
sync_task(string_param='I am sync task for immidiatly execuiton').execute()
Consuming process
Consumers need for listening messages from message broker, executing your tasks and writing result backend to selected ResultBackend.
To get your consumer instance use .consumer property of your selected broker.
For start consuming your tasks you need to call .start_consume() method. This method recieved 2 arguments: queue (by default 'default'), processes (by default 1). There is no recomendation how many procceses you should use. It depends from many things. So, just try to understand optimal value imperatively.
# async
await broker.consumer.start_consume(queue='main', processes=2)
# sync
broker.consumer.start_consume(processes=3)
So, if you need to listening many message brokers, you have apportunity for doing this. Also one consumer intance can listening many queues:
# consumer1.py
broker.consumer.start_consume(queue='main')
# consumer2.py
broker.consumer.start_consume(queue='notifications')
Cron tasks
Cron tasks are scheduled functions that run automatically on a time-based schedule. Declare them the same way as regular tasks — just add the cron parameter with a standard cron expression. The task is registered in the broker at decoration time and picked up by the cron consumer at startup.
# async
import asyncio
from melony import RedisBroker
from redis.asyncio import Redis
redis_connection = Redis(host='localhost', port=6379)
broker = RedisBroker(redis_connection=redis_connection)
@broker.task(cron='* * * * *', retries=2, retry_timeout=5)
async def send_report() -> None:
await asyncio.sleep(1)
print('report sent')
# sync
from melony import RedisBroker
from redis import Redis
redis_connection = Redis(host='localhost', port=6379)
broker = RedisBroker(redis_connection=redis_connection)
@broker.task(cron='0 9 * * 1-5', retries=3, retry_timeout=10)
def send_report() -> None:
print('report sent')
A cron task still works as a regular task — you can call .delay() on it at any time outside of the schedule.
To start executing scheduled tasks, run the cron consumer using the .cron_consumer property of your broker. It accepts the same queue and processes arguments as the regular consumer.
# async
await broker.cron_consumer.start_consume(queue='default', processes=1)
# sync
broker.cron_consumer.start_consume(queue='default', processes=1)
The cron consumer and the regular consumer are independent — you can run both in parallel, each listening to its own queue type.
# consumer.py — regular tasks
broker.consumer.start_consume(queue='main')
# cron_consumer.py — scheduled tasks
broker.cron_consumer.start_consume(queue='main')
If a cron task raises an exception, it is retried up to retries times with retry_timeout seconds between attempts. After success or all retries are exhausted, the task is automatically rescheduled for its next cron tick.
An invalid cron expression raises ValueError immediately at decoration time:
@broker.task(cron='not-valid') # raises ValueError: Invalid cron expression: 'not-valid'
def my_cron_task() -> None:
...
For developers
WRITING...
Contributing
See CONTRIBUTING.md
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file melony-1.3.1.tar.gz.
File metadata
- Download URL: melony-1.3.1.tar.gz
- Upload date:
- Size: 22.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6f654138fa7fbe89883126972d14ad277feb222ecb4b261fb516820e7f7ce2fd
|
|
| MD5 |
e6365592c4cb0b74fe430e8535875874
|
|
| BLAKE2b-256 |
1fa682935a8b7e4b5526b30aa6feba2e2a6acb1b457babd006e7b4ac8b19b181
|
File details
Details for the file melony-1.3.1-py3-none-any.whl.
File metadata
- Download URL: melony-1.3.1-py3-none-any.whl
- Upload date:
- Size: 27.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ccba97c1671d70967aaa004e0f32f19ca8968725ce776859a819832f827e035f
|
|
| MD5 |
54bb007f37d90f5c8e27ad194eab5429
|
|
| BLAKE2b-256 |
810b0f3348b1125ee70d3fe3a4bbdf92229fbb28b4b1a1ef2ac165e6cb593565
|