Skip to main content

Propan framework: the simplest way to work with a messaging queues

Project description

Propan logo

Tests coverage Coverage Package version downloads
Supported Python versions GitHub

Propan

Propan - just an another one HTTP a declarative Python MQ framework. It's following by fastapi, simplify Message Brokers around code writing and provides a helpful development toolkit, which existed only in HTTP-frameworks world until now.

It's designed to create reactive microservices around Messaging Architecture.

It is a modern, high-level framework on top of popular specific Python brokers libraries, based on pydantic and fastapi, pytest concepts.


Documentation: https://lancetnik.github.io/Propan/


The key features are

  • Simple: Designed to be easy to use and learn.
  • Intuitive: Great editor support. Autocompletion everywhere.
  • Dependencies management: Minimization of code duplication. Access to dependencies at any level of the call stack.
  • Integrations: Propan is fully compatible with any HTTP framework you want
  • MQ independent: Single interface to popular MQ:
  • RPC: The framework supports RPC requests over MQ, which will allow performing long operations on remote services asynchronously.
  • Great to develop: CLI tool provides great development experience:
    • framework-independent way to manage the project environment
    • application code hot reload
    • robust application templates
  • Testability: Propan allows you to test your app without external dependencies: you do not have to set up a Message Broker, you can use a virtual one!

Supported MQ brokers

async sync
RabbitMQ :heavy_check_mark: stable :heavy_check_mark: :mag: planning :mag:
Redis :heavy_check_mark: stable :heavy_check_mark: :mag: planning :mag:
Nats :heavy_check_mark: stable :heavy_check_mark: :mag: planning :mag:
Kafka :warning: beta :warning: :mag: planning :mag:
SQS :warning: beta :warning: :mag: planning :mag:
NatsJS :hammer_and_wrench: in progress :hammer_and_wrench: :mag: planning :mag:
MQTT :mag: planning :mag: :mag: planning :mag:
Redis Streams :mag: planning :mag: :mag: planning :mag:
Pulsar :mag: planning :mag: :mag: planning :mag:

Community

If you are interested in this project, please give me feedback by star or/and watch repository.

If you have any questions or ideas about features to implement, welcome to discussions.


Declarative?

With declarative tools you can define what you need to get. With traditional imperative tools you must write what you need to do.

Take a look at classic imperative tools, such as aio-pika, pika, redis-py, nats-py, etc.

This is the Quickstart with the aio-pika:

import asyncio
import aio_pika

async def main():
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/"
    )

    queue_name = "test_queue"

    async with connection:
        channel = await connection.channel()

        queue = await channel.declare_queue(queue_name)

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    print(message.body)

asyncio.run(main())

aio-pika is a great tool with a really easy learning curve. But it's still imperative. You need to connect, declare channel, queues, exchanges by yourself. Also, you need to manage connection, message, queue context to avoid any troubles.

It is not a bad way, but it can be much easier.

from propan import PropanApp, RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

app = PropanApp(broker)

@broker.handle("test_queue")
async def base_handler(body):
    print(body)

This is the Propan declarative way to write the same code. That is so much easier, isn't it?


Quickstart

Install using pip:

pip install "propan[async-rabbit]"
# or
pip install "propan[async-nats]"
# or
pip install "propan[async-redis]"
# or
pip install "propan[async-kafka]"
# or
pip install "propan[async-sqs]"

Basic usage

Create an application with the following code at serve.py:

from propan import PropanApp
from propan import RabbitBroker
# from propan import RedisBroker
# from propan import NatsBroker
# from propan import SQSBroker
# from propan import KafkaBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
# broker = NatsBroker("nats://localhost:4222")
# broker = RedisBroker("redis://localhost:6379")
# broker = SQSBroker("http://localhost:9324", ...)
# broker = KafkaBroker("localhost:9092")

app = PropanApp(broker)

@broker.handle("test")
async def base_handler(body):
    '''Handle all default exchange messages with `test` routing key'''
    print(body)

And just run it:

propan run serve:app

Type casting

Propan uses pydantic to cast incoming function arguments to types according to their annotation.

from pydantic import BaseModel
from propan import PropanApp, Context, RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(broker)

class SimpleMessage(BaseModel):
    key: int

@broker.handle("test2")
async def second_handler(body: SimpleMessage):
    assert isinstance(body.key, int)

Dependencies

Propan a has dependencies management policy close to pytest fixtures. You can specify in functions arguments which dependencies you would to use. Framework passes them from the global Context object.

Already existed context fields are: app, broker, context (itself), logger and message. If you call not existing field, raises pydantic.ValidationError value.

But you can specify your own dependencies, call dependencies functions (like Fastapi Depends) and more.

import aio_pika
from propan import PropanApp, RabbitBroker, Context, Depends

rabbit_broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

app = PropanApp(rabbit_broker)

async def dependency(body: dict) -> bool:
    return True

@rabbit_broker.handle("test")
async def base_handler(body: dict,
                       dep: bool = Depends(dependency),
                       broker: RabbitBroker = Context()):
    assert dep is True
    assert broker is rabbit_broker

CLI power

Propan has its own CLI tool that provided the following features:

  • project generation
  • multiprocessing workers
  • project hot reloading
  • custom command line arguments passing

Context passing

For example: pass your current .env project setting to context

propan run serve:app --env=.env.dev
from propan import PropanApp, RabbitBroker
from propan.annotations import ContextRepo
from pydantic import BaseSettings

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

app = PropanApp(broker)

class Settings(BaseSettings):
    ...

@app.on_startup
async def setup(env: str, context: ContextRepo):
    settings = Settings(_env_file=env)
    context.set_global("settings", settings)

Project template

Also, Propan CLI is able to generate a production-ready application template:

propan create async rabbit [projectname]

Notice: project template require pydantic[dotenv] installation.

Run the created project:

# Run rabbimq first
docker compose --file [projectname]/docker-compose.yaml up -d

# Run project
propan run [projectname].app.serve:app --env=.env --reload

Now you can enjoy a new development experience!


HTTP Frameworks integrations

Any Framework

You can use Propan MQBrokers without PropanApp. Just start and stop them according to your application lifespan.

from propan import NatsBroker
from sanic import Sanic

app = Sanic("MyHelloWorldApp")
broker = NatsBroker("nats://localhost:4222")

@broker.handle("test")
async def base_handler(body):
    print(body)

@app.after_server_start
async def start_broker(app, loop):
    await broker.start()

@app.after_server_stop
async def stop_broker(app, loop):
    await broker.close()

FastAPI Plugin

Also, Propan can be used as part of FastAPI.

Just import a PropanRouter you need and declare the message handler using the @event decorator. This decorator is similar to the decorator @handle for the corresponding brokers.

from fastapi import Depends, FastAPI
from pydantic import BaseModel
from propan.fastapi import RabbitRouter

app = FastAPI()

router = RabbitRouter("amqp://guest:guest@localhost:5672")

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.event("test")
async def hello(m: Incoming, d = Depends(call)) -> dict:
    return { "response": "Hello, Propan!" }

app.include_router(router)

Examples

To see more framework usages go to examples/

Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

propan-0.1.2.4.tar.gz (52.1 kB view details)

Uploaded Source

Built Distribution

propan-0.1.2.4-py3-none-any.whl (80.6 kB view details)

Uploaded Python 3

File details

Details for the file propan-0.1.2.4.tar.gz.

File metadata

  • Download URL: propan-0.1.2.4.tar.gz
  • Upload date:
  • Size: 52.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.3

File hashes

Hashes for propan-0.1.2.4.tar.gz
Algorithm Hash digest
SHA256 bf8a03b69b99546e05ddb5c0f3fe0bb814621998f4e5243a6de0535d49ad71e6
MD5 ecd1084647218355ef6f14c98d930f22
BLAKE2b-256 ee60588b7a99fa7f957dda5b47c2e09ca05d80d883712aebf3ea1db5eee0b80f

See more details on using hashes here.

File details

Details for the file propan-0.1.2.4-py3-none-any.whl.

File metadata

  • Download URL: propan-0.1.2.4-py3-none-any.whl
  • Upload date:
  • Size: 80.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.3

File hashes

Hashes for propan-0.1.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 47620fd989b1f799afa099654b97b7040b719149ef1d349fc6e91f1efd06f96a
MD5 81b57a2d6bdd6a1ebd98a0edafb284ce
BLAKE2b-256 ce4e25f77b2635d0c688309077827bb4275360682795f7a85cdce4dc7eb02fae

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page