Skip to main content

A simplified, powerful interface for distributed task management in Python, built on Celery.

Project description

FlowGrid

PyPI version Downloads Downloads Month

FlowGrid is a Python library designed to improve parallelization across multiple machines with a powerful and user-friendly interface, allowing you to focus on your application logic without worrying about the complexities of task distribution and management.

Motivation

The primary goal of FlowGrid is to provide an easy-to-use interface for parallelizing Python tasks across multiple workers.

In Python, a common design pattern is to launch asynchronous tasks while instantly responding to the user—FastAPI, for instance, addresses this with background_tasks, though it acknowledges this method may not be the most reliable, especially for long-running tasks.

FlowGrid abstracts the complexity of task management, progress tracking, and task cancellation, making it easier to build scalable and robust distributed systems. This library is also designed to seamlessly integrate with popular frameworks like FastAPI, Flask, and Django, solving common concurrency challenges effortlessly.

While other solutions like Celery (which FlowGrid uses under the hood) are available, they often require more configuration and are more challenging to use. FlowGrid simplifies the process while maintaining the full power of Celery, with additional enhancements such as native task state checking and improved handling of task cancellations, even for tasks in progress.

Installation

Note: FlowGrid will soon be available on PyPI. You will be able to install it using pip:

pip install py-flowgrid

FlowGrid requires Python 3.7 or higher.

Basic Usage

Defining and Launching Tasks

FlowGrid simplifies task management in Celery by allowing you to define tasks using decorators and manage them seamlessly. Here's an example:

import time
from flowgrid import FlowGrid

fg = FlowGrid()

@fg.task
def add(x: float, y: float) -> float:
    # Simulate a long running task
    time.sleep(10)
    return x + y

def main():
    task = add(1, 2)
    # Task id is none because it is not launched
    print('TASK:', task.task_id)

    # You can explicitly launch the task or let the
    # wait function do it for you
    # task = fg.launch(task)  # Can be uncommented

    # At this point the task id is available
    # print('TASK:', task.task_id)  # Can be uncommented
    response = fg.wait(task)
    print('RESPONSE:', response)

if __name__ == '__main__':
    main()

Real-Time Progress Updates

FlowGrid supports real-time progress tracking for your tasks. You can update and monitor the progress easily:

import time
from flowgrid import FlowGrid

fg = FlowGrid()

@fg.task
def add_multiple(x: float, y: float, times: int = 10) -> float:
    response = x
    for i in range(times):
        time.sleep(1)
        fg.update(progress=i, total=times, percent=100*i/times)
        response += y
    return response

def main():
    task = add_multiple(10, 5, times=5)
    response = fg.wait(task)  # Expected: 10 + 5*5 = 35
    print('RESPONSE:', response)

if __name__ == '__main__':
    main()

Task Cancellation

FlowGrid allows you to cancel tasks, either forcefully or gracefully:

Forceful Cancellation

import time
from flowgrid import FlowGrid

fg = FlowGrid()

@fg.task
def add_multiple(x: float, y: float, times: int = 10) -> float:
    response = x
    for i in range(times):
        time.sleep(1)
        fg.update(progress=i, total=times, percent=100*i/times)
        response += y
    return response

def main():
    task = fg.launch(add_multiple(10, 5, times=5))

    time.sleep(3)

    # Simulating a user cancelling the task
    print('CANCELLING TASK')
    # force=True will terminate the task immediately
    fg.revoke(task, force=True)

if __name__ == '__main__':
    main()

Graceful Cancellation

import time
from flowgrid import FlowGrid

fg = FlowGrid()

@fg.task
def add_multiple(x: float, y: float, times: int = 10) -> float:
    response = x
    for i in range(times):
        # Check for revocation and stop if needed
        if fg.is_revoked():
            print('CANCELLED')
            return
        time.sleep(1)
        fg.update(progress=i, total=times, percent=100*i/times)
        response += y
    return response

def main():
    task = fg.launch(add_multiple(10, 5, times=5))
    print('TASK:', task.task_id)

    time.sleep(3)

    # Simulating a user cancelling the task
    print('CANCELLING TASK')
    # Graceful cancellation
    fg.revoke(task)

if __name__ == '__main__':
    main()

Worker Management

FlowGrid workers can be launched with the command:

flowgrid worker --app <path-to-your-flowgrid-class>

You can also use the -A shorthand for --app:

flowgrid worker -A <path-to-your-flowgrid-class>

For example in order to prepare workers to launch the initial example from this repository, you can run:

flowgrid worker -A examples.01-base.fg

The .fg must be included because it is the variable name of my FlowGrid instance in the file examples/01-base.py.

To view all available options, use:

flowgrid worker -h

Some useful options include:

  • Concurrency: Control the number of worker processes with --concurrency or -c.
  • Log Level: Set the logging level with --loglevel or -l.

Example

Here's a basic example that demonstrates how to define and launch tasks using FlowGrid:

import time
from flowgrid import FlowGrid

fg = FlowGrid()

@fg.task
def add(x: float, y: float) -> float:
    time.sleep(10)
    return x + y

def main():
    task = add(1, 2)
    print('TASK:', task.task_id)
    response = fg.wait(task)
    print('RESPONSE:', response)

if __name__ == '__main__':
    main()

Configuration

You can configure FlowGrid by setting environment variables. Here are all keys and their default values:

  • FLOWGRID_CELERY_BROKER_URL=amqp://guest:guest@localhost:5672//
  • FLOWGRID_CELERY_RESULT_BACKEND=redis://localhost:6380/0
  • FLOWGRID_SERIALIZER=json
  • FLOWGRID_TASK_SERIALIZER=json
  • FLOWGRID_RESULT_SERIALIZER=json
  • FLOWGRID_ACCEPT_CONTENT=json
  • FLOWGRID_TIMEZONE=UTC
  • FLOWGRID_ENABLE_UTC=True

Serializer, task serializer, result serializer, and accept content can be set to json, pickle or msgpac. Serializer is a global setting that will be used for all serializers if not set.

The timezone can be set to any valid timezone. The FLOWGRID_ENABLE_UTC variable can be set to True or False (case-insensitive).

Future Enhancements

  • Extended Documentation: More detailed documentation and examples will be added as the project evolves.
  • Support chaining tasks without waiting: Currently, you have to wait for a task to finish before chaining another task. You can check the example examples/06-chaining.py to see how to chain tasks. With the next solution it will look in the exact same way but if you use fg.launch the response will be instant and you can return the task to the user from the beginning.
  • Support task sequence without relationship: Right now you can only chain tasks just by including them as paramteres, but what if the response of a task is not needed for the next one. We will create an interface called: fg.sequence that will allow you to define a sequence of tasks that will be executed in order.

License

FlowGrid is licensed under the MIT License. See LICENSE for more information.

Contributing

Contributions are welcome! Please fork the repository and submit a pull request to contribute to FlowGrid.

Contact

For any questions or inquiries, please contact the maintainers via [your email/contact information].

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

py_flowgrid-0.3.8.tar.gz (19.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

py_flowgrid-0.3.8-py3-none-any.whl (22.1 kB view details)

Uploaded Python 3

File details

Details for the file py_flowgrid-0.3.8.tar.gz.

File metadata

  • Download URL: py_flowgrid-0.3.8.tar.gz
  • Upload date:
  • Size: 19.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for py_flowgrid-0.3.8.tar.gz
Algorithm Hash digest
SHA256 1ce39a6edc90247ac76c6d32f1fa3a389e1703cf4c0148ee36a3a0421b5652d0
MD5 ae6e2eff46536dc661b9091dbf408e14
BLAKE2b-256 9a3f50648923669e00d863581532de0da17d16b616080b4ac3bebb51c57c04f1

See more details on using hashes here.

File details

Details for the file py_flowgrid-0.3.8-py3-none-any.whl.

File metadata

  • Download URL: py_flowgrid-0.3.8-py3-none-any.whl
  • Upload date:
  • Size: 22.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.3

File hashes

Hashes for py_flowgrid-0.3.8-py3-none-any.whl
Algorithm Hash digest
SHA256 2902027456ea9c19b1f4a508b7cdaa9dd9a2eb91a30cc4146f1f94822ab41dee
MD5 6c20dcb986192ee3e8b54417a1ca9a4d
BLAKE2b-256 f5efced866827efb6f683cb65332a99660a273b93ec7833f0756d9f0ff5bb9a1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page