Skip to main content

Kafka event source mapping adaptor for AMGI applications

Project description

amgi-kafka-event-source-mapping

amgi-kafka-event-source-mapping is an adaptor for AMGI applications to run in a Kafka event source mapped environment.

Installation

pip install amgi-kafka-event-source-mapping==0.38.0

Example

This example uses AsyncFast:

from dataclasses import dataclass

from amgi_kafka_event_source_mapping import KafkaEventSourceMappingHandler
from asyncfast import AsyncFast

app = AsyncFast()


@dataclass
class Order:
    item_ids: list[str]


@app.channel("orders")
async def orders(order: Order) -> None:
    # Makes an order
    ...


handler = KafkaEventSourceMappingHandler(app)

What it does

  • Converts Kafka batch events into AMGI message.receive events
  • Uses the Kafka topic name as the AMGI message address
  • Supports partial batch failures so only failed records are reported
  • Sends outbound messages to Kafka using an async producer
  • Outbound messages are sent via the same Kafka broker (bootstrap servers) that the records were received from
  • Optionally manages application startup and shutdown via AMGI lifespan

Record handling

  • Record values and keys are passed to your app as bytes
  • Kafka record headers become AMGI headers
  • Records are only acknowledged when your app emits message.ack
  • Records that emit message.nack or are not acknowledged are treated as failures

Nack handling

By default, records that are negatively acknowledged, or not acknowledged are logged:

handler = KafkaEventSourceMappingHandler(app, on_nack="log")

To fail the invocation when any record is nacked, configure the handler to raise an error instead:

handler = KafkaEventSourceMappingHandler(app, on_nack="error")

This is useful when running in environments where a failed invocation should trigger a retry, or alert.

When using this mode, handlers must be idempotent. Kafka event source mappings may re-deliver records after failures, restarts, or rebalances, and your application logic should be safe to execute more than once for the same record.

Lifespan

Lifespan support is enabled by default.

  • Startup runs once per Lambda execution environment
  • Shutdown is attempted when the environment is terminated

Shutdown handling relies on signal.SIGTERM, which is supported by Python 3.12 and later Lambda runtimes.

To use fully stateless, per-invocation behavior, disable lifespan:

handler = KafkaEventSourceMappingHandler(app, lifespan=False)

Contact

For questions or suggestions, please contact jack.burridge@mail.com.

License

Copyright 2026 AMGI

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amgi_kafka_event_source_mapping-0.38.0.tar.gz (5.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file amgi_kafka_event_source_mapping-0.38.0.tar.gz.

File metadata

  • Download URL: amgi_kafka_event_source_mapping-0.38.0.tar.gz
  • Upload date:
  • Size: 5.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for amgi_kafka_event_source_mapping-0.38.0.tar.gz
Algorithm Hash digest
SHA256 702ed757554e7c98a6803e6d549ab2f124f47859f5832082a0cccb7b007b93d3
MD5 10e2f8f14b2073c3a2d7c4831e3fc451
BLAKE2b-256 54ed9e888fa3e27f1291d723b954784bb545aa931df8e403c903c1fe573e8077

See more details on using hashes here.

File details

Details for the file amgi_kafka_event_source_mapping-0.38.0-py3-none-any.whl.

File metadata

  • Download URL: amgi_kafka_event_source_mapping-0.38.0-py3-none-any.whl
  • Upload date:
  • Size: 6.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for amgi_kafka_event_source_mapping-0.38.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f55344bcc3d88e8179d3c27bb4144401b57dc3e6227d7adfd37a489a56236168
MD5 a5e7a80d0dbf6626aa969ee434ebfe5b
BLAKE2b-256 526c8f76c5742a54b3d2b38c8cc3864d4217e9d4f0763ca632e9cf2afffec869

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page