Skip to main content

asyncapi

Project description

Python AsyncAPI

asyncapi-python

Python library for translate asyncapi specification to python code, without code generation.


AsyncAPI Pattern: https://asyncapi.io


Documentation: https://dutradda.github.io/asyncapi-python/

Source Code: https://github.com/dutradda/asyncapi-python


Key Features

  • Reads an asyncapi specification and create publishers and subscribers from it

  • Support for specification declaration with dataclasses

  • Provides application for create subscribers

  • Support for kafka, redis and postgres protocols (same as broadcaster library)

  • Extra support for google cloud pubsub service

  • Expose in http the auto-generated specification

Requirements

  • Python 3.8+

  • broadcaster

  • jsondaora

  • requests (Optional for http specification)

  • typer (Optional for subscriber application)

  • pyyaml (Optional for yaml specification)

  • apidaora (Optional for expose specification)

  • Package extra installs:

    • http
    • yaml
    • kafka
    • redis
    • postgres
    • subscriber
    • docs
    • google-cloud-pubsub

Installation

$ pip install asyncapi[http,yaml,redis,subscriber,docs]

YAML Specification Example

asyncapi: 2.0.0

info:
  title: User API
  version: '1.0.0'
  description: API to manage users

servers:
  development:
    url: localhost
    protocol: redis
    description: Development Broker Server

channels:
  user/update:
    description: Topic for user updates
    subscribe:
      operationId: receive_user_update
      message:
        $ref: '#/components/messages/UserUpdate'
    publish:
      message:
        $ref: '#/components/messages/UserUpdate'

components:
  messages:
    UserUpdate:
      name: userUpdate
      title: User Update
      summary: Inform about users updates
      payload:
        type: object
        required:
          - id
        properties:
          id:
            type: string
          name:
            type: string
          age:
            type: integer

defaultContentType: application/json

Creating subscribers module

# user_events.py

from typing import Any


async def receive_user_update(message: Any) -> None:
    print(f"Received update for user id={message.id}")

Start subscriber to listen events

PYTHONPATH=. asyncapi-subscriber \
    --url api-spec.yaml \
    --api-module user_events
Waiting messages...

Publishing Updates

# publish.py

import asyncio

from asyncapi import build_api


api = build_api('api-spec.yaml')
channel_id = 'user/update'
message = api.payload(channel_id, id='fake-user', name='Fake User', age=33)


async def publish() -> None:
    await api.connect()
    await api.publish(channel_id, message)
    await api.disconnect()


asyncio.run(publish())

print(f"Published update for user={message.id}")
python publish.py


Published update for user=fake-user

Receive Updates

Waiting messages...
Received update for user id=fake-user

Expose Specification

asyncapi-docs --path api-spec.yaml
curl -i localhost:5000/asyncapi.yaml

Python Specification Example

# specification.py

import dataclasses
from typing import Optional

import asyncapi


@dataclasses.dataclass
class UserUpdatePayload:
    id: str
    name: Optional[str] = None
    age: Optional[int] = None


dev_server = asyncapi.Server(
    url='localhost',
    protocol=asyncapi.ProtocolType.REDIS,
    description='Development Broker Server',
)
message = asyncapi.Message(
    name='userUpdate',
    title='User Update',
    summary='Inform about users updates',
    payload=UserUpdatePayload,
)
user_update_channel = asyncapi.Channel(
    description='Topic for user updates',
    subscribe=asyncapi.Operation(
        operation_id='receive_user_update', message=message,
    ),
    publish=asyncapi.Operation(message=message),
)

spec = asyncapi.Specification(
    info=asyncapi.Info(
        title='User API', version='1.0.0', description='API to manage users',
    ),
    servers={'development': dev_server},
    channels={'user/update': user_update_channel},
    components=asyncapi.Components(messages={'UserUpdate': message}),
)

Creating subscribers module

# py_spec_user_events.py

import specification


spec = specification.spec


async def receive_user_update(
    message: specification.UserUpdatePayload,
) -> None:
    print(f"Received update for user id={message.id}")

Start subscriber to listen events

PYTHONPATH=. asyncapi-subscriber --api-module user_events
Waiting messages...

Publishing Updates

# publish.py

import asyncio

from asyncapi import build_api_auto_spec


api = build_api_auto_spec('specification')
channel_id = 'user/update'
message = api.payload(channel_id, id='fake-user', name='Fake User', age=33)


async def publish() -> None:
    await api.connect()
    await api.publish(channel_id, message)
    await api.disconnect()


asyncio.run(publish())

print(f"Published update for user={message.id}")
python publish.py


Published update for user=fake-user

Receive Updates

Waiting messages...
Received update for user id=fake-user

Expose Specification

PYTHONPATH=. asyncapi-docs --api-module specification
curl -i localhost:5000/asyncapi.yaml

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for asyncapi, version 0.10.0
Filename, size File type Python version Upload date Hashes
Filename, size asyncapi-0.10.0-py3-none-any.whl (18.0 kB) File type Wheel Python version py3 Upload date Hashes View
Filename, size asyncapi-0.10.0.tar.gz (16.0 kB) File type Source Python version None Upload date Hashes View

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring DigiCert DigiCert EV certificate Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page