Kafka event source mapping adaptor for AMGI applications
Project description
amgi-kafka-event-source-mapping
amgi-kafka-event-source-mapping is an adaptor for AMGI applications to run in a Kafka event source mapped environment.
Installation
pip install amgi-kafka-event-source-mapping==0.34.0
Example
This example uses AsyncFast:
from dataclasses import dataclass
from amgi_kafka_event_source_mapping import KafkaEventSourceMappingHandler
from asyncfast import AsyncFast
app = AsyncFast()
@dataclass
class Order:
item_ids: list[str]
@app.channel("orders")
async def orders(order: Order) -> None:
# Makes an order
...
handler = KafkaEventSourceMappingHandler(app)
What it does
- Converts Kafka batch events into AMGI
message.receiveevents - Uses the Kafka topic name as the AMGI message address
- Supports partial batch failures so only failed records are reported
- Sends outbound messages to Kafka using an async producer
- Outbound messages are sent via the same Kafka broker (bootstrap servers) that the records were received from
- Optionally manages application startup and shutdown via AMGI lifespan
Record handling
- Record values and keys are passed to your app as bytes
- Kafka record headers become AMGI headers
- Records are only acknowledged when your app emits
message.ack - Records that emit
message.nackor are not acknowledged are treated as failures
Nack handling
By default, records that are negatively acknowledged, or not acknowledged are logged:
handler = KafkaEventSourceMappingHandler(app, on_nack="log")
To fail the invocation when any record is nacked, configure the handler to raise an error instead:
handler = KafkaEventSourceMappingHandler(app, on_nack="error")
This is useful when running in environments where a failed invocation should trigger a retry, or alert.
When using this mode, handlers must be idempotent. Kafka event source mappings may re-deliver records after failures, restarts, or rebalances, and your application logic should be safe to execute more than once for the same record.
Lifespan
Lifespan support is enabled by default.
- Startup runs once per Lambda execution environment
- Shutdown is attempted when the environment is terminated
Shutdown handling relies on signal.SIGTERM, which is supported by Python 3.12 and later Lambda runtimes.
To use fully stateless, per-invocation behavior, disable lifespan:
handler = KafkaEventSourceMappingHandler(app, lifespan=False)
Contact
For questions or suggestions, please contact jack.burridge@mail.com.
License
Copyright 2026 AMGI
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file amgi_kafka_event_source_mapping-0.34.0.tar.gz.
File metadata
- Download URL: amgi_kafka_event_source_mapping-0.34.0.tar.gz
- Upload date:
- Size: 5.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d742288fbf4face6b67eb1ce9f2b4c484d2ebf4647519338a0e0b0a361a4d4c1
|
|
| MD5 |
10cd3a6c879ce8fea4075dfcde7ba9e7
|
|
| BLAKE2b-256 |
b6e87d84d83d2ffcff005befebfad0c1119a9eff9ef0896bb1a2f2b7c1c94738
|
File details
Details for the file amgi_kafka_event_source_mapping-0.34.0-py3-none-any.whl.
File metadata
- Download URL: amgi_kafka_event_source_mapping-0.34.0-py3-none-any.whl
- Upload date:
- Size: 6.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.10.2 {"installer":{"name":"uv","version":"0.10.2","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5acfa0b29522f05534b32b71f022e61671166f4dc237a43d5f41b84373a13762
|
|
| MD5 |
cd0e8a767ad48505dad688b0db71d51b
|
|
| BLAKE2b-256 |
060f7b02bddd8e58fcaeb04e085511e87620699ea619cbc58b9205f2ab2fc867
|