Skip to main content

Apache Airflow Kafka provider containing Deferrable Operators & Sensors.

Project description

Kafka Airflow Provider

GitHub release (latest by date)PyPIPyPI - Downloads

An airflow provider to:

  • interact with kafka clusters
  • read from topics
  • write to topics
  • wait for specific messages to arrive to a topic

This package currently contains

3 hooks (airflow_provider_kafka.hooks) :

  • admin_client.KafkaAdminClientHook - a hook to work against the actual kafka admin client
  • consumer.KafkaConsumerHook - a hook that creates a consumer and provides it for interaction
  • producer.KafkaProducerHook - a hook that creates a producer and provides it for interaction

4 operators (airflow_provider_kafka.operators) :

  • await_message.AwaitKafkaMessageOperator - a deferable operator (sensor) that awaits to encounter a message in the log before triggering down stream tasks.
  • consume_from_topic.ConsumeFromTopicOperator - an operator that reads from a topic and applies a function to each message fetched.
  • produce_to_topic.ProduceToTopicOperator - an operator that uses a iterable to produce messages as key/value pairs to a kafka topic.
  • event_triggers_function.EventTriggersFunctionOperator - an operator that listens for messages on the topic and then triggers a downstream function before going back to listening.

1 trigger airflow_provider_kafka.triggers :

  • await_message.AwaitMessageTrigger

Quick start

pip install airflow-provider-kafka

Example usages :

FAQs

Why confluent kafka and not (other library) ? A few reasons: the confluent-kafka library is guaranteed to be 1:1 functional with librdkafka, is faster, and is maintained by a company with a commercial stake in ensuring the continued quality and upkeep of it as a product.

Why not release this into airflow directly ? I could probably make the PR and get it through, but the airflow code base is getting huge and I don't want to burden the maintainers with code that they don't own for maintainence. Also there's been multiple attempts to get a Kafka provider in before and this is just faster.

Why is most of the configuration handled in a dict ? Because that's how confluent-kafka does it. I'd rather maintain interfaces that people already using kafka are comfortable with as a starting point - I'm happy to add more options/ interfaces in later but would prefer to be thoughtful about it to ensure that there difference between these operators and the actual client interface are minimal.

Local Development

Unit Tests

Unit tests are located at tests/unit, a kafka server isn't required to run these tests. execute with pytest

Setup on M1 Mac

Installing on M1 chip means a brew install of the librdkafka library before you can pip install confluent-kafka

brew install librdkafka
export C_INCLUDE_PATH=/opt/homebrew/Cellar/librdkafka/1.8.2/include
export LIBRARY_PATH=/opt/homebrew/Cellar/librdkafka/1.8.2/lib
pip install confluent-kafka

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow-provider-kafka-0.2.2.tar.gz (15.8 kB view details)

Uploaded Source

Built Distribution

airflow_provider_kafka-0.2.2-py3-none-any.whl (20.6 kB view details)

Uploaded Python 3

File details

Details for the file airflow-provider-kafka-0.2.2.tar.gz.

File metadata

  • Download URL: airflow-provider-kafka-0.2.2.tar.gz
  • Upload date:
  • Size: 15.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for airflow-provider-kafka-0.2.2.tar.gz
Algorithm Hash digest
SHA256 7c89b7abd9593a7d5af374b4f35a1b4ca430a9621c5b8579a99bee880f91279a
MD5 532c303dc3504abb9428badd0ca0c058
BLAKE2b-256 1452d9b2482cb944cbceaa50da1f2db836ca69b3ff2ca8a05289229b14ec9f58

See more details on using hashes here.

File details

Details for the file airflow_provider_kafka-0.2.2-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_provider_kafka-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 e48d7d9946368578a24a4ba2f0b103f30167d447e98f51d5dcdd5e3e2e3fab53
MD5 120cc0604b7584024f62285d9248e8d7
BLAKE2b-256 ef8f66ea8cc93a4f061e3cb17a69f3b121b930dcea164dd035e820c485c03fc6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page