A wrapper around FastAPI to easily consume kafka messages as routes
Project description
Usage/Examples
Under the hood FastAPI Kafka uses AIOKafka to create a consumer. You define a consumer with the following, it matches the exact class inputs that AIOKafkaConsumer uses:
class KafkaConsumerConfig:
loop: Optional[asyncio.AbstractEventLoop] = None
bootstrap_servers: str = "localhost"
client_id: Optional[str] = None
group_id: Optional[str] = None
group_instance_id: Optional[str] = None
key_deserializer: Optional[Any] = None
value_deserializer: Optional[Any] = None
fetch_max_wait_ms: int = 500
fetch_max_bytes: int = 52428800 # 50 MB
fetch_min_bytes: int = 1
max_partition_fetch_bytes: int = 1 * 1024 * 1024 # 1 MB
request_timeout_ms: int = 40 * 1000 # 40 seconds
retry_backoff_ms: int = 100
auto_offset_reset: str = "latest"
enable_auto_commit: bool = True
auto_commit_interval_ms: int = 5000
check_crcs: bool = True
metadata_max_age_ms: int = 5 * 60 * 1000 # 5 minutes
partition_assignment_strategy: tuple[Any, ...] = (RoundRobinPartitionAssignor,)
max_poll_interval_ms: int = 300000 # 5 minutes
rebalance_timeout_ms: Optional[int] = None
session_timeout_ms: int = 10000 # 10 seconds
heartbeat_interval_ms: int = 3000 # 3 seconds
consumer_timeout_ms: int = 200 # 200 ms
max_poll_records: Optional[int] = None
ssl_context: Optional[ssl.SSLContext] = None
security_protocol: str = "PLAINTEXT"
api_version: str = "auto"
exclude_internal_topics: bool = True
connections_max_idle_ms: int = 540000 # 9 minutes
isolation_level: str = "read_uncommitted"
sasl_mechanism: str = "PLAIN"
sasl_plain_password: Optional[str] = None
sasl_plain_username: Optional[str] = None
sasl_kerberos_service_name: str = "kafka"
sasl_kerberos_domain_name: Optional[str] = None
sasl_oauth_token_provider: Optional[Any] = None
Simple Example
Here's a full example running a local kafka server:
from typing import Any
from pydantic import BaseModel
from fastapi_kafka import FastAPIKafka
from fastapi_kafka.consumer import KafkaConsumerConfig
class SomeMessage(BaseModel):
test: str
app = FastAPIKafka(
kafka_consumer_config=KafkaConsumerConfig(
bootstrap_servers="localhost:9092"
)
)
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.consumer("test-topic")
async def read_item(item: Any):
return item
@app.consumer("test-topic-model")
async def read_item_model(item: SomeMessage):
return item
The FastAPIKafka can accept all standard constructor inputs as well.
To define a consumer topic you use @app.consumer("<topic name>")
, you can define a kafka message as Any
and it will return a UTF-8 decoded message when reading the item
.
If you use JSON messages you can define a pydantic model to read it much the same as a regular http route, the message will be transformed into an instance of that model.
More Advanced Example (Using Upstash Kafka)
from typing import Any
from uuid import uuid4
from pydantic import BaseModel
from fastapi_kafka import FastAPIKafka, create_ssl_context
from fastapi_kafka.consumer import KafkaConsumerConfig
class SomeMessage(BaseModel):
test: str
consumer_config = KafkaConsumerConfig(
bootstrap_servers='example-us1-kafka.upstash.io:9092',
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
ssl_context=create_ssl_context(),
sasl_plain_username='xxx',
sasl_plain_password='xxx',
client_id=f'consumer-{uuid4()}',
group_id='consumer-group-api',
auto_offset_reset='earliest',
)
app = FastAPIKafka(kafka_consumer_config=consumer_config)
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.consumer("test-topic")
async def read_item(item: Any):
return item
@app.consumer("test-topic-model")
async def read_item_model(item: SomeMessage):
return item
For security_protocol
using SASL_SSL
you must pass in a create_ssl_context()
.
For even more advanced configuration such as defining your own certificates, more information can be found here: AIOKafka SSL.
FastAPI Lifespan
One material change that was made to FastAPI was how Lifespan works. FastAPI Kafka initializes the Lifespan automatically to create the defined Kafka consumer, if you need additional start-up/shutdown behavior you can use the startup_functions
or shutdown_functions
when initializing the FastAPIKafka
instance.
The startup_functions
or shutdown_functions
fields accept a list of synchronous or asynchronous functions to be executed before/after the yield in lifespan.
async def my_startup_actions():
await do_something()
do_something_sync()
async def my_shutdown_actions():
await do_something_on_shutdown()
app = FastAPIKafka(
kafka_consumer_config=consumer_config,
startup_functions=[my_startup_actions],
shutdown_functions=[my_shutdown_actions]
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for fastapi_kafka-0.1.3-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6f9767413f2018e73b2f66923cdf1bf0a191810ef909ec0a9fdba2de4ad0e2ba |
|
MD5 | b709ce4c40f2f2f8b08f3238af3f4db5 |
|
BLAKE2b-256 | d9928c75849fbdabdf6796ad0c3b9fa3975accb5bb5bb3b74d3c08c71b9eb163 |