asyncapi
Project description
Python AsyncAPI
Python library for translate asyncapi specification to python code, without code generation.
AsyncAPI Pattern: https://asyncapi.io
Documentation: https://dutradda.github.io/asyncapi-python/
Source Code: https://github.com/dutradda/asyncapi-python
Key Features
-
Reads an asyncapi specification and create publishers and subscribers from it
-
Support for specification declaration with dataclasses
-
Provides application for create subscribers
-
Support for kafka, redis and postgres protocols (same as broadcaster library)
-
Extra support for google cloud pubsub service
-
Expose in http the auto-generated specification
Requirements
-
Python 3.8+
-
broadcaster
-
jsondaora
-
requests (Optional for http specification)
-
typer (Optional for subscriber application)
-
pyyaml (Optional for yaml specification)
-
apidaora (Optional for expose specification)
-
Package extra installs:
- http
- yaml
- kafka
- redis
- postgres
- subscriber
- docs
- google-cloud-pubsub
Installation
$ pip install asyncapi[http,yaml,redis,subscriber,docs]
YAML Specification Example
asyncapi: 2.0.0
info:
title: User API
version: '1.0.0'
description: API to manage users
servers:
development:
url: localhost
protocol: redis
description: Development Broker Server
channels:
user/update:
description: Topic for user updates
subscribe:
operationId: receive_user_update
message:
$ref: '#/components/messages/UserUpdate'
publish:
message:
$ref: '#/components/messages/UserUpdate'
components:
messages:
UserUpdate:
name: userUpdate
title: User Update
summary: Inform about users updates
payload:
type: object
required:
- id
properties:
id:
type: string
name:
type: string
age:
type: integer
defaultContentType: application/json
Creating subscribers module
# user_events.py
from typing import Any
async def receive_user_update(message: Any) -> None:
print(f"Received update for user id={message.id}")
Start subscriber to listen events
PYTHONPATH=. asyncapi-subscriber \
--url api-spec.yaml \
--api-module user_events
Waiting messages...
Publishing Updates
# publish.py
import asyncio
from asyncapi import build_api
api = build_api('api-spec.yaml')
channel_id = 'user/update'
message = api.payload(channel_id, id='fake-user', name='Fake User', age=33)
async def publish() -> None:
await api.connect()
await api.publish(channel_id, message)
await api.disconnect()
asyncio.run(publish())
print(f"Published update for user={message.id}")
python publish.py
Published update for user=fake-user
Receive Updates
Waiting messages...
Received update for user id=fake-user
Expose Specification
asyncapi-docs --path api-spec.yaml
curl -i localhost:5000/asyncapi.yaml
Python Specification Example
# specification.py
import dataclasses
from typing import Optional
import asyncapi
@dataclasses.dataclass
class UserUpdatePayload:
id: str
name: Optional[str] = None
age: Optional[int] = None
dev_server = asyncapi.Server(
url='localhost',
protocol=asyncapi.ProtocolType.REDIS,
description='Development Broker Server',
)
message = asyncapi.Message(
name='userUpdate',
title='User Update',
summary='Inform about users updates',
payload=UserUpdatePayload,
)
user_update_channel = asyncapi.Channel(
description='Topic for user updates',
subscribe=asyncapi.Operation(
operation_id='receive_user_update', message=message,
),
publish=asyncapi.Operation(message=message),
)
spec = asyncapi.Specification(
info=asyncapi.Info(
title='User API', version='1.0.0', description='API to manage users',
),
servers={'development': dev_server},
channels={'user/update': user_update_channel},
components=asyncapi.Components(messages={'UserUpdate': message}),
)
Creating subscribers module
# py_spec_user_events.py
import specification
spec = specification.spec
async def receive_user_update(
message: specification.UserUpdatePayload,
) -> None:
print(f"Received update for user id={message.id}")
Start subscriber to listen events
PYTHONPATH=. asyncapi-subscriber --api-module user_events
Waiting messages...
Publishing Updates
# publish.py
import asyncio
from asyncapi import build_api_auto_spec
api = build_api_auto_spec('specification')
channel_id = 'user/update'
message = api.payload(channel_id, id='fake-user', name='Fake User', age=33)
async def publish() -> None:
await api.connect()
await api.publish(channel_id, message)
await api.disconnect()
asyncio.run(publish())
print(f"Published update for user={message.id}")
python publish.py
Published update for user=fake-user
Receive Updates
Waiting messages...
Received update for user id=fake-user
Expose Specification
PYTHONPATH=. asyncapi-docs --api-module specification
curl -i localhost:5000/asyncapi.yaml
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file asyncapi-0.14.1.tar.gz
.
File metadata
- Download URL: asyncapi-0.14.1.tar.gz
- Upload date:
- Size: 102.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: python-requests/2.24.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5e0fc9b2ec4c7eff465f257fbaca4632a8cb034e2c0182961deffcfaedeb5446 |
|
MD5 | 96ff242b2d01fd1b6ef6c4a03d315eec |
|
BLAKE2b-256 | 6f365a01fd87d3b1cb7943456cfe8c5a46d286d7d08f5b4f3defd4fd2e464cba |
File details
Details for the file asyncapi-0.14.1-py3-none-any.whl
.
File metadata
- Download URL: asyncapi-0.14.1-py3-none-any.whl
- Upload date:
- Size: 111.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: python-requests/2.24.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6d95fc5df0dcdbc65d050e5b38c18e31d7a967d0e757ef268e398ab96c20fd8f |
|
MD5 | 4e3de4fb437f6f1726cd7afa6cab39b2 |
|
BLAKE2b-256 | ea15f137449071004b0aaeaaaf89455d12d443b1131a0fd6007527f91863fa25 |