Skip to main content

Apache Airflow Kafka provider containing Deferrable Operators & Sensors.

Project description

⚠️ Discontinuation of project

Astronomer donated this provider to the official Apache Airflow repository in March 2023. Since then, the original Astronomer repository and its PyPI package have been discontinued. For more information on the new Kafka package:

Please note that the code available in the original repository may not work with the latest dependencies or platforms, and it could contain security vulnerabilities. Astronomer can’t offer guarantees or warranties for its use. Thanks for being part of the open-source journey and helping keep great ideas alive!

Kafka Airflow Provider

GitHub release (latest by date)PyPIPyPI - Downloads

An airflow provider to:

  • interact with kafka clusters
  • read from topics
  • write to topics
  • wait for specific messages to arrive to a topic

This package currently contains

3 hooks (airflow_provider_kafka.hooks) :

  • admin_client.KafkaAdminClientHook - a hook to work against the actual kafka admin client
  • consumer.KafkaConsumerHook - a hook that creates a consumer and provides it for interaction
  • producer.KafkaProducerHook - a hook that creates a producer and provides it for interaction

4 operators (airflow_provider_kafka.operators) :

  • await_message.AwaitKafkaMessageOperator - a deferable operator (sensor) that awaits to encounter a message in the log before triggering down stream tasks.
  • consume_from_topic.ConsumeFromTopicOperator - an operator that reads from a topic and applies a function to each message fetched.
  • produce_to_topic.ProduceToTopicOperator - an operator that uses a iterable to produce messages as key/value pairs to a kafka topic.
  • event_triggers_function.EventTriggersFunctionOperator - an operator that listens for messages on the topic and then triggers a downstream function before going back to listening.

1 trigger airflow_provider_kafka.triggers :

  • await_message.AwaitMessageTrigger

Quick start

pip install airflow-provider-kafka

Example usages :

FAQs

Why confluent kafka and not (other library) ? A few reasons: the confluent-kafka library is guaranteed to be 1:1 functional with librdkafka, is faster, and is maintained by a company with a commercial stake in ensuring the continued quality and upkeep of it as a product.

Why not release this into airflow directly ? I could probably make the PR and get it through, but the airflow code base is getting huge and I don't want to burden the maintainers with code that they don't own for maintenance. Also there's been multiple attempts to get a Kafka provider in before and this is just faster.

Why is most of the configuration handled in a dict ? Because that's how confluent-kafka does it. I'd rather maintain interfaces that people already using kafka are comfortable with as a starting point - I'm happy to add more options/ interfaces in later but would prefer to be thoughtful about it to ensure that there difference between these operators and the actual client interface are minimal.

How performant is this ? Look we're not replacing native consumer/producer applications with this - but if you have some light/medium weight batch processes you need to run against a Kafka cluster, this should get you started while you figure out if you need to scale up into something

Local Development

Getting started:

  1. pip install angreal && angreal dev-setup
angreal 2.0.3

USAGE:
    angreal [OPTIONS] <SUBCOMMAND>

OPTIONS:
    -h, --help       Print help information
    -v, --verbose    verbose level, (may be used multiple times for more verbosity)
    -V, --version    Print version information

SUBCOMMANDS:
    demo-clean      shut down services and remove files
    demo-start      start services for example dags
    demo-stop       stop services for example dags
    dev-setup       setup a development environment
    help            Print this message or the help of the given subcommand(s)
    init            Initialize an Angreal template from source.
    lint            lint our project
    run-tests       run our test suite. default is unit tests only
    static-tests    run static analyses on our project

Setup on M1 Mac

Installing on M1 chip means a brew install of the librdkafka library before you can pip install confluent-kafka

brew install librdkafka
export C_INCLUDE_PATH=/opt/homebrew/Cellar/librdkafka/1.8.2/include
export LIBRARY_PATH=/opt/homebrew/Cellar/librdkafka/1.8.2/lib
pip install confluent-kafka

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow_provider_kafka-0.2.5.tar.gz (17.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

airflow_provider_kafka-0.2.5-py3-none-any.whl (21.6 kB view details)

Uploaded Python 3

File details

Details for the file airflow_provider_kafka-0.2.5.tar.gz.

File metadata

  • Download URL: airflow_provider_kafka-0.2.5.tar.gz
  • Upload date:
  • Size: 17.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for airflow_provider_kafka-0.2.5.tar.gz
Algorithm Hash digest
SHA256 29c9b21cce3db43cfc33044906c668aea54dd5668c6b344be2729af19980ceb6
MD5 b851b5a48fc2feaedfa87e8858bda58b
BLAKE2b-256 746c6a5de0142299af4058d09bcf539bbc064bc8f5c85a2c67cc22c9e270eb81

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_kafka-0.2.5.tar.gz:

Publisher: release.yaml on astronomer/airflow-provider-kafka

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file airflow_provider_kafka-0.2.5-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_provider_kafka-0.2.5-py3-none-any.whl
Algorithm Hash digest
SHA256 2d67295146e694cc4fb0a2f872c475e911dc4cc0da75b0ae1c0a2fa241262e36
MD5 68811828a2b7b245fd9c305478a4c51b
BLAKE2b-256 6a766bdcddf67d7472d42f5ab4e6e2a915d9837e10fadcced7cefeb74cc3b79d

See more details on using hashes here.

Provenance

The following attestation bundles were made for airflow_provider_kafka-0.2.5-py3-none-any.whl:

Publisher: release.yaml on astronomer/airflow-provider-kafka

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page