Skip to main content

Apache Airflow Kafka provider containing Deferrable Operators & Sensors.

Project description

Kafka Airflow Provider

An airflow provider to:

  • interact with kafka clusters
  • read from topics
  • write to topics
  • wait for specific messages to arrive to a topic

This package currently contains

3 hooks :

  • airflow_provider_kafka.hooks.admin_client.KafkaAdminClientHook - a hook to work against the actual kafka admin client
  • airflow_provider_kafka.hooks.consumer.KafkaConsumerHook - a hook that creates a computer and provides it for interaction
  • `airflow_provider_kafka.hooks.producer.KafkaProducerHook - a hook that creates a producer and provides it for interaction

3 operators :

  • airflow_provider_kafka.operators.await_message.AwaitKafkaMessageOperator - a deferable operator (sensor) that awaits to encounter a message in the log before triggering down stream tasks.
  • airflow_provider_kafka.operators.consume_from_topic.ConsumeFromTopicOperator - an operator that reads from a topic and applies a function to each message fetched.
  • airflow_provider_kafka.operators.produce_to_topic.ProduceToTopicOperator - an operator that uses a iterable to produce messages as key/value pairs to a kafka topic.

1 trigger :

  • airflow_provider_kafka.triggers.await_message.AwaitMessageTrigger

Quick start

pip install airflow-provider-kafka

    from airflow_provider_kafka.operators.await_message import AwaitKafkaMessageOperator
    from airflow_provider_kafka.operators.consume_from_topic import ConsumeFromTopicOperator
    from airflow_provider_kafka.operators.produce_to_topic import ProduceToTopicOperator

    def producer_function():
        for i in range(20):
            yield (json.dumps(i), json.dumps(i + 1))



    consumer_logger = logging.getLogger("airflow")
    def consumer_function(message, prefix=None):
        key = json.loads(message.key())
        value = json.loads(message.value())
        consumer_logger.info(f"{prefix} {message.topic()} @ {message.offset()}; {key} : {value}")
        return


    def await_function(message):
        if json.loads(message.value()) % 5 == 0:
            return f" Got the following message: {json.loads(message.value())}"

    t1 = ProduceToTopicOperator(
        task_id="produce_to_topic",
        topic="test_1",
        producer_function="hello_kafka.producer_function",
        kafka_config={"bootstrap.servers": "broker:29092"},
    )

    t2 = ConsumeFromTopicOperator(
        task_id="consume_from_topic",
        topics=["test_1"],
        apply_function="hello_kafka.consumer_function",
        apply_function_kwargs={"prefix": "consumed:::"},
        consumer_config={
            "bootstrap.servers": "broker:29092",
            "group.id": "foo",
            "enable.auto.commit": False,
            "auto.offset.reset": "beginning",
        },
        commit_cadence="end_of_batch",
        max_messages=10,
        max_batch_size=2,
    )

    AwaitKafkaMessageOperator(
        task_id="awaiting_message",
        topics=["test_1"],
        apply_function="hello_kafka.await_function",
        kafka_config={
            "bootstrap.servers": "broker:29092",
            "group.id": "awaiting_message",
            "enable.auto.commit": False,
            "auto.offset.reset": "beginning",
        },
        xcom_push_key="retrieved_message",
    )

FAQs

** Why confluent kafka and not (other library) ? ** A few reasons but primarily : the confluent-kafka library is guaranteed to be 1:1 functional with librdkafka, is faster, and is maintained by a company with a commercial stake in ensuring the continued quality and upkeep of its products.

** Why not release this into airflow directly ? ** I could probably make the PR and get it through, but the airflow code base is getting huge and I don't want to burden the maintainers with a code base that they don't own for maintainence. Also there's been multiple attempts to get a Kafka provider in before and this is just faster.

** Why is most of the configuration handled in a dict ? ** Because that's basically how confluen-kafka does it. I'd rather maintain interfaces that people already using kafka are comfortable with as a starting point - I'm happy to add more options/ interfaces in later but would prefer to be thoughtful about it to ensure that there difference between these operators and the actual client interface are minimal.

Development

Unit Tests

Unit tests are located at tests/unit, a kafka server isn't required to run these tests. execute with pytest

Setup on M1 Mac

Installing on M1 chip means a brew install of the librdkafka library before you can pip install confluent-kafka

brew install librdkafka
export C_INCLUDE_PATH=/opt/homebrew/Cellar/librdkafka/1.8.2/include
export LIBRARY_PATH=/opt/homebrew/Cellar/librdkafka/1.8.2/lib
pip install confluent-kafka

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow-provider-kafka-0.1.0.tar.gz (15.3 kB view details)

Uploaded Source

Built Distribution

airflow_provider_kafka-0.1.0-py3-none-any.whl (18.4 kB view details)

Uploaded Python 3

File details

Details for the file airflow-provider-kafka-0.1.0.tar.gz.

File metadata

  • Download URL: airflow-provider-kafka-0.1.0.tar.gz
  • Upload date:
  • Size: 15.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/33.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.63.0 importlib-metadata/4.11.2 keyring/23.5.0 rfc3986/1.5.0 colorama/0.4.4 CPython/3.9.10

File hashes

Hashes for airflow-provider-kafka-0.1.0.tar.gz
Algorithm Hash digest
SHA256 c07045e81d9e4e67bff03565a42abd190bfe2bff64b7fa436674263638ce9cc5
MD5 eca7ea69149664486f55dbe50dd3ae9f
BLAKE2b-256 d580858311a861f19a4e16f85ea70effc22b2bd0f57c0e1dcff413dbbc651b26

See more details on using hashes here.

File details

Details for the file airflow_provider_kafka-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: airflow_provider_kafka-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 18.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/33.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.63.0 importlib-metadata/4.11.2 keyring/23.5.0 rfc3986/1.5.0 colorama/0.4.4 CPython/3.9.10

File hashes

Hashes for airflow_provider_kafka-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 08fb122ed3d164e8223af513662b8287813b03bc850dad10ba2fe5fb6ca046a5
MD5 cb22a0522573ad279f5841361f03f029
BLAKE2b-256 632c0fa123058ade55c11ca770deb30a813d03c9ed2899a3195130dcfff66598

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page