Skip to main content

Multisync task manager for python

Project description

🍈 Melony

Modern task manager for python with types, async and sync support!

PyPI - Python Version PyPI PyPI - Downloads Ask DeepWiki GitHub stars GitHub forks code style - wemake Coverage

Features

  • Automatically asynchronous or synchronous, depending on provided message broker connection
  • Simple for users, simple for developers
  • Fully checked with we-make-python-styleguide
  • Fully typed for users
  • Scaled by processes
  • Scaled automatically
  • Retry policy (cascade or simmilar soon)
  • Revocable tasks (pipelines)
  • Cron tasks
  • Powerful UI with analytics, full task control and alerts
  • 100% test coverage
  • Great docs
  • No AI

To be continue

Quickstart

Initialize melony broker at tasks.py and declare your tasks:

import time
from melony import RedisBroker
from redis import Redis

broker = RedisBroker(redis_connection=Redis(host='localhost', port=6379))

@broker.task(queue='notifications', retries=2, retry_timeout=30)
def example_task(string_param: str) -> str:
    time.sleep(5)
    return string_param.upper()

Delay your task anywhere:

example_task(string_param='Im string param').delay(countdown=30)

Run your consumer at consumer.py

from tasks import broker

broker.consumer.start_consume(processes=2)

Avaible brokers

  • Redis
  • RabbitMQ
  • Kafka

To be continue

Installation

Using pip:

pip install melony

Using uv:

uv add melony

Documentation

Brokers

First of all you need to choose your broker. At this moment, you are able to use only RedisBroker, but very soon they will be more. Your application can have any numbers of melony brokers. You don't have to choose broker by async/sync parameter or import broker from io/sync packages of this lib, just import broker and create it. Thats works for all melony entities AT ALL! You don't have to think about this. So, let's initialize your selected broker:

from melony import RedisBroker
from redis.asyncio import Redis
from redis import Redis as SyncRedis

# async
async_redis_connection = Redis(host='localhost', port=6379, db=0)
broker = RedisBroker(redis_connection=redis_connection)

# sync
sync_redis_connection = SyncRedis(host='localhost', port=6379, db=0)  # Other connection here
broker = RedisBroker(redis_connection=redis_connection)

Also you are able to provide result backend to your broker. At this moment, you are able to use only RedisResultBackend. Result backend save your task results (return values) to selected database.

from melony import RedisBroker, RedisResultBackend
from redis.asyncio import Redis
from redis import Redis as SyncRedis

# async
async_redis_connection = Redis(host='localhost', port=6379, db=0)
result_backend = RedisResultBackend(redis_connection=async_redis_connection)
broker = RedisBroker(redis_connection=redis_connection, result_backend=result_backend)

# sync
sync_redis_connection = SyncRedis(host='localhost', port=6379, db=0)
result_backend = RedisResultBackend(redis_connection=sync_redis_connection)
broker = RedisBroker(redis_connection=redis_connection)

Task declaration

After your broker initilization, you are able to register task for next delaying and execution using special decorator (broker method) .task(), which can recieved 3 arguments: queue (string 'default' by default), retries (default=1) and retry_timeout (default=0). Your task function must be async if you're using io message broker client and sync if it's not.

# async
import asyncio

from melony import RedisBroker, RedisResultBackend
from redis.asyncio import Redis

redis_connection = Redis(host='localhost', port=6379)
broker = RedisBroker(redis_connection=redis_connection)


@broker.task(retries=2, retry_timeout=30)
async def async_task(string_param: str) -> str:
    await asyncio.sleep(2)
    return string_param.upper()
# sync
import time

from melony import RedisBroker, RedisResultBackend
from redis import Redis

redis_connection = Redis(host='localhost', port=6379)
broker = RedisBroker(redis_connection=redis_connection)


@broker.task(retries=2, retry_timeout=30)
def sync_task(string_param: str) -> str:
    time.sleep(2)
    return string_param.upper()

You should remember, that parameter retry_timeout doesn't guarantee that your task will be executed literally after seconds you selected (depends from your messages value in queue). If you need more accuracy, try to configurate your consumer instances (see consumer documentation: processes parameter).

Task delaying

After your tasks declaration, you are able to delay your tasks for next execution by countdown time. For doing this, you need to call your task as usually, then call special TaskWrapper method. This method calls .delay().

# async
await async_task(string_param='I am async task with 15 sec coundown').dalay(countdown=15)

# sync
sync_task(string_param='I am sync task with 30 sec coundown').execute(countdown=30)

Attention: for this moment you are able to delaying tasks only for 24 hours maximum. You will have opportunity to delay your tasks more then 24 hours by postgres/rabbitmq/kafka broker for long life tasks. Now you should use postgres, for example, to save your task list in Celery =(. So, i'm working on it.

Task execution normally

If your function is decorated by @broker.task decorator, you still are able to execute this task usually by special method .execute() instead .delay(). This mehod has no arguments and needed for immidiatly execuiton your function as common python function.

# async
await async_task(string_param='I am async task for immidiatly execuiton').execute()

# sync
sync_task(string_param='I am sync task for immidiatly execuiton').execute()

Consuming process

Consumers need for listening messages from message broker, executing your tasks and writing result backend to selected ResultBackend. To get your consumer instance use .consumer property of your selected broker. For start consuming your tasks you need to call .start_consume() method. This method recieved 2 arguments: queue (by default 'default'), processes (by default 1). There is no recomendation how many procceses you should use. It depends from many things. So, just try to understand optimal value imperatively.

# async
await broker.consumer.start_consume(queue='main', processes=2)

# sync
broker.consumer.start_consume(processes=3)

So, if you need to listening many message brokers, you have apportunity for doing this. Also one consumer intance can listening many queues:

# consumer1.py
broker.consumer.start_consume(queue='main')

# consumer2.py
broker.consumer.start_consume(queue='notifications')

Cron tasks

Cron tasks are scheduled functions that run automatically on a time-based schedule. Declare them the same way as regular tasks — just add the cron parameter with a standard cron expression. The task is registered in the broker at decoration time and picked up by the cron consumer at startup.

# async
import asyncio

from melony import RedisBroker
from redis.asyncio import Redis

redis_connection = Redis(host='localhost', port=6379)
broker = RedisBroker(redis_connection=redis_connection)


@broker.task(cron='* * * * *', retries=2, retry_timeout=5)
async def send_report() -> None:
    await asyncio.sleep(1)
    print('report sent')
# sync
from melony import RedisBroker
from redis import Redis

redis_connection = Redis(host='localhost', port=6379)
broker = RedisBroker(redis_connection=redis_connection)


@broker.task(cron='0 9 * * 1-5', retries=3, retry_timeout=10)
def send_report() -> None:
    print('report sent')

A cron task still works as a regular task — you can call .delay() on it at any time outside of the schedule.

To start executing scheduled tasks, run the cron consumer using the .cron_consumer property of your broker. It accepts the same queue and processes arguments as the regular consumer.

# async
await broker.cron_consumer.start_consume(queue='default', processes=1)

# sync
broker.cron_consumer.start_consume(queue='default', processes=1)

The cron consumer and the regular consumer are independent — you can run both in parallel, each listening to its own queue type.

# consumer.py — regular tasks
broker.consumer.start_consume(queue='main')

# cron_consumer.py — scheduled tasks
broker.cron_consumer.start_consume(queue='main')

If a cron task raises an exception, it is retried up to retries times with retry_timeout seconds between attempts. After success or all retries are exhausted, the task is automatically rescheduled for its next cron tick.

An invalid cron expression raises ValueError immediately at decoration time:

@broker.task(cron='not-valid')  # raises ValueError: Invalid cron expression: 'not-valid'
def my_cron_task() -> None:
    ...

For developers

WRITING...

Contributing

See CONTRIBUTING.md

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

melony-1.3.1.tar.gz (22.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

melony-1.3.1-py3-none-any.whl (27.2 kB view details)

Uploaded Python 3

File details

Details for the file melony-1.3.1.tar.gz.

File metadata

  • Download URL: melony-1.3.1.tar.gz
  • Upload date:
  • Size: 22.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for melony-1.3.1.tar.gz
Algorithm Hash digest
SHA256 6f654138fa7fbe89883126972d14ad277feb222ecb4b261fb516820e7f7ce2fd
MD5 e6365592c4cb0b74fe430e8535875874
BLAKE2b-256 1fa682935a8b7e4b5526b30aa6feba2e2a6acb1b457babd006e7b4ac8b19b181

See more details on using hashes here.

File details

Details for the file melony-1.3.1-py3-none-any.whl.

File metadata

  • Download URL: melony-1.3.1-py3-none-any.whl
  • Upload date:
  • Size: 27.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for melony-1.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ccba97c1671d70967aaa004e0f32f19ca8968725ce776859a819832f827e035f
MD5 54bb007f37d90f5c8e27ad194eab5429
BLAKE2b-256 810b0f3348b1125ee70d3fe3a4bbdf92229fbb28b4b1a1ef2ac165e6cb593565

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page