Skip to main content

Kafka pipline

Project description

os-scrapy-kafka-pipeline

Build Status codecov PyPI - Python Version PyPI

This project provide pipeline to send Scrapy Item to kafka as JSON format

Features:

  • support config default kafka brokers and topic in the settings.py file
  • support kafka-python producer init args
  • support dynamic connect and send to other kafka cluster and topic using item meta
  • item will send to kafka as JSON format, bytes can be encoded to base64 string if it can not be utf-8 encoded

Install

pip install os-scrapy-kafka-pipeline

You can run example spider directly in the project root path.

scrapy crawl example

Usage

Settings

  • Enable pipeline in the project settings.py file

    ITEM_PIPELINES = {
        "os_scrapy_kafka_pipeline.KafkaPipeline": 300,
    }
    
  • Config default kafka brokers

    KAFKA_PRODUCER_BROKERS = ["broker01.kafka:9092", "broker02.kafka:9092"]
    
    • brokers in the item meta will override this default value
    • pipeline will not be enabled when this settings can not to start kafka connection
    • it will raise exception when no brokers configured
  • Config default kafka producer

    KAFKA_PRODUCER_CONFIGS = {"client_id": "id01", "retries": 1}
    
    • this is global config, the dynamic connections will use this configs
    • the bootstrap_servers will not work when KAFKA_PRRDUCER_BROKERS already configured
  • Config defult topic

    KAFKA_PRODUCER_TOPIC = "topic01"
    
    • the config in the item.meta will override this config
    • it will raise exception when no topic configured
  • Config kafka-python loglevel (default "WARNING")

    KAFKA_PRODUCER_LOGLEVEL = "DEBUG"
    
  • Config kafka producer close timeout (default: None)

    KAFKA_PRODUCER_CLOSE_TIMEOUT = 10
    
  • Ensure base64

    The bytes type of the item mumber will be decoded by utf-8, if decode fail, the pipeline will use base64 to encode the bytes when you set:

    KAFKA_VALUE_ENSURE_BASE64 = True
    
  • Filter field

    You can filter item fields which will not export and send to kafka

    KAFKA_EXPORT_FILTER = ["filtered_field"]
    

Dynamic Kafka Connection with item.meta

  • you can set topic, key, partition using item["meta"]

  • the item must has meta mumber which type is dict

  • options:

    meta = {
        "kafka.topic": "topic01",
        "kafka.key": "key01",
        "kafka.partition": 1,
        "kafka.brokers": "broker01.kafka:9092,broker02.kafka:9092"
    }
    

Storage Format

Item will send to kafka as JSON format, bytes will encode to base64

Unit Tests

sh scripts/test.sh

License

MIT licensed.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

os_scrapy_kafka_pipeline-0.0.15.tar.gz (10.7 kB view details)

Uploaded Source

File details

Details for the file os_scrapy_kafka_pipeline-0.0.15.tar.gz.

File metadata

  • Download URL: os_scrapy_kafka_pipeline-0.0.15.tar.gz
  • Upload date:
  • Size: 10.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.25.0 setuptools/51.0.0 requests-toolbelt/0.9.1 tqdm/4.54.1 CPython/3.6.7

File hashes

Hashes for os_scrapy_kafka_pipeline-0.0.15.tar.gz
Algorithm Hash digest
SHA256 2dd80514049c771f5e92a9219aee7b70ec919c3b10f2edd2e043e95b9212ead5
MD5 4ad096a97f553c74832d19574cd07f4c
BLAKE2b-256 e6aabfbca340b753dcde0f60ac54b39429dc0e60b7ac86cebf68c2d6642a409a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page