Kafka pipline
Project description
os-scrapy-kafka-pipeline
This project provide pipeline to send Scrapy Item to kafka as JSON format
Features:
- support config default kafka brokers and topic in the settings.py file
- support kafka-python producer init args
- support dynamic connect and send to other kafka cluster and topic using item meta
- item will send to kafka as JSON format, bytes can be encoded to base64 string if it can not be utf-8 encoded
Install
pip install os-scrapy-kafka-pipeline
You can run example spider directly in the project root path.
scrapy crawl example
Usage
Settings
-
Enable pipeline in the project settings.py file
ITEM_PIPELINES = { "os_scrapy_kafka_pipeline.KafkaPipeline": 300, }
-
Config default kafka brokers
KAFKA_PRODUCER_BROKERS = ["broker01.kafka:9092", "broker02.kafka:9092"]
- brokers in the item meta will override this default value
- pipeline will not be enabled when this settings can not to start kafka connection
- it will raise exception when no brokers configured
-
Config default kafka producer
KAFKA_PRODUCER_CONFIGS = {"client_id": "id01", "retries": 1}
- this is global config, the dynamic connections will use this configs
- the
bootstrap_servers
will not work whenKAFKA_PRRDUCER_BROKERS
already configured
-
Config defult topic
KAFKA_PRODUCER_TOPIC = "topic01"
- the config in the item.meta will override this config
- it will raise exception when no topic configured
-
Config kafka-python loglevel (default "WARNING")
KAFKA_PRODUCER_LOGLEVEL = "DEBUG"
-
Config kafka producer close timeout (default: None)
KAFKA_PRODUCER_CLOSE_TIMEOUT = 10
-
Ensure base64
The bytes type of the item mumber will be decoded by utf-8, if decode fail, the pipeline will use base64 to encode the bytes when you set:
KAFKA_VALUE_ENSURE_BASE64 = True
-
Filter field
You can filter item fields which will not export and send to kafka
KAFKA_EXPORT_FILTER = ["filtered_field"]
Dynamic Kafka Connection with item.meta
-
you can set topic, key, partition using item["meta"]
-
the item must has meta mumber which type is dict
-
options:
meta = { "kafka.topic": "topic01", "kafka.key": "key01", "kafka.partition": 1, "kafka.brokers": "broker01.kafka:9092,broker02.kafka:9092" }
Storage Format
Item will send to kafka as JSON format, bytes will encode to base64
Unit Tests
sh scripts/test.sh
License
MIT licensed.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Hashes for os_scrapy_kafka_pipeline-0.0.15.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2dd80514049c771f5e92a9219aee7b70ec919c3b10f2edd2e043e95b9212ead5 |
|
MD5 | 4ad096a97f553c74832d19574cd07f4c |
|
BLAKE2b-256 | e6aabfbca340b753dcde0f60ac54b39429dc0e60b7ac86cebf68c2d6642a409a |