Kafka pipline
Project description
os-scrapy-kafka-pipeline
This project provide pipeline to send Scrapy Item to kafka as JSON format
Features:
- support config default kafka brokers and topic in the settings.py file
- support kafka-python producer init args
- support dynamic connect and send to other kafka cluster and topic using item meta
- item will send to kafka as JSON format, bytes can be encoded to base64 string if it can not be utf-8 encoded
Install
pip install os-scrapy-kafka-pipeline
You can run example spider directly in the project root path.
scrapy crawl example
Usage
Settings
-
Enable pipeline in the project settings.py file
ITEM_PIPELINES = { "os_scrapy_kafka_pipeline.KafkaPipeline": 300, }
-
Config default kafka brokers
KAFKA_PRODUCER_BROKERS = ["broker01.kafka:9092", "broker02.kafka:9092"]
- brokers in the item meta will override this default value
- pipeline will not be enabled when this settings can not to start kafka connection
- it will raise exception when no brokers configured
-
Config default kafka producer
KAFKA_PRODUCER_CONFIGS = {"client_id": "id01", "retries": 1}
- this is global config, the dynamic connections will use this configs
- the
bootstrap_servers
will not work whenKAFKA_PRRDUCER_BROKERS
already configured
-
Config defult topic
KAFKA_PRODUCER_TOPIC = "topic01"
- the config in the item.meta will override this config
- it will raise exception when no topic configured
-
Config kafka-python loglevel (default "WARNING")
KAFKA_PRODUCER_LOGLEVEL = "DEBUG"
-
Config kafka producer close timeout (default: None)
KAFKA_PRODUCER_CLOSE_TIMEOUT = 10
-
Ensure base64
The bytes type of the item mumber will be decoded by utf-8, if decode fail, the pipeline will use base64 to encode the bytes when you set:
KAFKA_VALUE_ENSURE_BASE64 = True
-
Filter field
You can filter item fields which will not export and send to kafka
KAFKA_EXPORT_FILTER = ["filtered_field"]
Dynamic Kafka Connection with item.meta
-
you can set topic, key, partition using item["meta"]
-
the item must has meta mumber which type is dict
-
options:
meta = { "kafka.topic": "topic01", "kafka.key": "key01", "kafka.partition": 1, "kafka.brokers": "broker01.kafka:9092,broker02.kafka:9092" }
Storage Format
Item will send to kafka as JSON format, bytes will encode to base64
Unit Tests
sh scripts/test.sh
License
MIT licensed.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file os_scrapy_kafka_pipeline-0.0.15.tar.gz
.
File metadata
- Download URL: os_scrapy_kafka_pipeline-0.0.15.tar.gz
- Upload date:
- Size: 10.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.25.0 setuptools/51.0.0 requests-toolbelt/0.9.1 tqdm/4.54.1 CPython/3.6.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2dd80514049c771f5e92a9219aee7b70ec919c3b10f2edd2e043e95b9212ead5 |
|
MD5 | 4ad096a97f553c74832d19574cd07f4c |
|
BLAKE2b-256 | e6aabfbca340b753dcde0f60ac54b39429dc0e60b7ac86cebf68c2d6642a409a |