Kafka pipline
Project description
os-scrapy-kafka-pipeline
This project provide pipeline to send Scrapy Item to kafka as JSON format
Features:
- support config default kafka brokers and topic in the settings.py file
- support kafka-python producer init args
- support dynamic connect and send to other kafka cluster and topic using item meta
- item will send to kafka as JSON format, bytes can be encoded to base64 string if it can not be utf-8 encoded
Install
pip install os-scrapy-kafka-pipeline
You can run example spider directly in the project root path.
scrapy crawl example
Usage
Settings
-
enable pipeline in the project settings.py file
ITEM_PIPELINES = { "os_scrapy_kafka_pipeline.KafkaPipeline": 300, }
-
config default kafka brokers
KAFKA_PRODUCER_BROKERS = ["broker01.kafka:9092", "broker02.kafka:9092"]
- brokers in the item meta will override this default value
- pipeline will not be enabled when this settings can not to start kafka connection
- it will raise exception when no brokers configured
-
config default kafka producer
KAFKA_PRODUCER_CONFIGS = {"client_id": "id01", "retries": 1}
- this is global config, the dynamic connections will use this configs
- the
bootstrap_servers
will not work whenKAFKA_PRRDUCER_BROKERS
already configured
-
config defult topic
KAFKA_PRODUCER_TOPIC = "topic01"
- the config in the item.meta will override this config
- it will raise exception when no topic configured
-
config kafka-python loglevel (default "WARNING")
KAFKA_PRODUCER_LOGLEVEL = "DEBUG"
-
config kafka producer close timeout (default: None)
KAFKA_PRODUCER_CLOSE_TIMEOUT = 10
-
ensure base64
the bytes type of the item mumber will be encoded by utf-8, if encode fail, the pipeline can use base64 encode the bytes when you set:
KAFKA_VALUE_ENSURE_BASE64 = True
Dynamic Kafka Connection with item.meta
-
you can set topic, key, partition using item["meta"]
-
the item must has meta mumber which type is dict
-
options:
meta = { "kafka.topic": "topic01", "kafka.key": "key01", "kafka.partition": 1, "kafka.brokers": "broker01.kafka:9092,broker02.kafka:9092" }
Storage Format
Item will send to kafka as JSON format, bytes will encode to base64
Unit Tests
sh scripts/test.sh
License
MIT licensed.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Hashes for os_scrapy_kafka_pipeline-0.0.7.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 081a63230eba0df96b26190fcde1171eddffbc69f47e20ab4f6cb8b4d1e466aa |
|
MD5 | 3e1db23416fd3098e07d1e30c6678ad0 |
|
BLAKE2b-256 | 75240b888a84d4e45f632f9cfb4527cea2f43fbb8f32d10b4fe54790812b4a91 |