Integrating Scrapy with kafka using the confluent-kafka python client
Project description
🚀 Kafka Scrapy Connect
Overview
kafka_scrapy_connect
is a custom Scrapy library that aims to integrates Scrapy with Kafka.
It consists of two main components: spiders and pipelines, which interact with Kafka for message consumption and item publishing.
This project has been motivated by the great work undertaken in: https://github.com/dfdeshom/scrapy-kafka.
kafka_scrapy_connect
utilises Confluent's Kafka Python client under the hood, to provide high-level producer and consumer features.
Features
1️⃣ Integration with Kafka 📈
- Enables communication between Scrapy spiders and Kafka topics for efficient data processing.
- Through partitions and consumer groups message processing can be parallelised across multiple spiders!
- Reduces overhead and improves throughput by giving the user the ability to consume messages in batches.
2️⃣ Customisable Settings 🛠️
- Provides flexibility through customisable configuration for both consumers and producers.
3️⃣ Error Handling 🚑
- Automatically handles network errors during crawling and publishes failed URLs to a designated output topic.
4️⃣ Serialisation Customisation 🧬
- Allows users to customize how Kafka messages are deserializsd by overriding the process_kafka_message method.
Installation
You can install kafka_scrapy_connect
via pip:
pip install kafka_scrapy_connect
Example
A full example using the kafka_scrapy_connect
library can be found inside the repo.
The only prerequisite for walking through the example is the installation of Docker 🐳 .
This is needed because a kafka cluster will be created locally using containers so kafka_scrapy_connect
can communicate with a broker.
If all set, follow the below steps 👇
- Create a virtual environment, clone the repo and install requirements:
python3 -m venv .venv
source .venv/bin/activate
git clone https://github.com/spicyparrot/kafka_scrapy_connect.git && cd kafka_scrapy_connect
pip install -r requirements.txt
- Create a local kafka cluster with required topics:
bash ./examples/kafka/kafka_start.sh --input-topic ScrapyInput,1 --output-topic ScrapyOutput,1 --error-topic ScrapyError,1
- Initiate the spider:
cd examples/quotes && scrapy crawl quotes
-
Publish a message to the input kafka topic and watch the spider consume and process the mesasge 🪄
- ☝️ This will require some custom producer code to publish messages or go to https://localhost:8080 and use the UI to publish some example messages in! (The UI was created when bringing up your local kafka cluster)
-
When satisfied with testing, exit the spider and clean up the local kafka cluster:
bash ./examples/kafka/kafka_stop.sh
Usage
Custom Settings
kafka_scrapy_connect
supports the following custom settings:
SCRAPY_KAFKA_HOSTS
- A list of kafka broker hosts. (Default:localhost:29092
)SCRAPY_INPUT_TOPIC
- Topic from which the spider[s] consumes messages from. (Default:ScrapyInput
)SCRAPY_OUTPUT_TOPIC
- Topic where scraped items are published. (Default:ScrapyOutput
)SCRAPY_ERROR_TOPIC
- Topic for publishing URLs that failed due to network errors. (Default:ScrapyError
)SCRAPY_CONSUMER_CONFIG
- Additional configuration options for Kafka consumers (see here)SCRAPY_PRODUCER_CONFIG
- Additional configuration options for Kafka producers (see here)SCRAPY_KAFKA_PRODUCER
- Key used for partitioning messages in Kafka producer (Default:""
Roundrobin)SCRAPY_KAFKA_PRODUCER_CALLBACKS
- Enable or disable asynchronous message delivery callbacks. (Default:False
)
Customisation
Customising deserialisation
You can customize how Kafka messages are deserialized by overriding the process_kafka_message method in your spider class.
This allows for handling custom message formats or data transformations.
class CustomSpider(KafkaListeningSpider):
def process_kafka_message(self, message, meta={}, headers={}):
# Custom deserialization logic
# Return URL, metadata or None if extraction fails
pass
⚠️ By default, if no custom process_kafka_message
method is provided, the spider method process_kafka_message
will expect a JSON payload or a string containing a valid url. If it's a JSON object, it expects url
in the K/V pair.
Customising Producer & Consumer settings
You can customize producer and consumer settings by providing a dictionary of configuration options in your Scrapy settings under SCRAPY_PRODUCER_CONFIG
and SCRAPY_CONSUMER_CONFIG
.
# Example SCRAPY_PRODUCER_CONFIG
SCRAPY_PRODUCER_CONFIG = {
'compression.type': 'gzip',
'request.timeout.ms': 5000
}
# Example SCRAPY_CONSUMER_CONFIG
SCRAPY_CONSUMER_CONFIG = {
'fetch.wait.max.ms': 10,
'max.poll.interval.ms': 600000,
'auto.offset.reset': 'latest'
}
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for kafka-scrapy-connect-1.1.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | ebc36440f49105033c9592ad344631b08431f600fb2fa6e0e3d19573cc737ed6 |
|
MD5 | c5b3180c6acfbc759fdc5cf496c5b853 |
|
BLAKE2b-256 | 8387dd8f8305c245194fda303e7eba90cc83c0f319c3f4431f52743fc73d9b07 |
Hashes for kafka_scrapy_connect-1.1.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | ce69018519f24e6641ea975bfc5fecb24e3eeda7a6bbd8caa22c1fc2ce507f94 |
|
MD5 | 3dc5c602df9d39ef3f914c91487a453a |
|
BLAKE2b-256 | 2250a2a680fb643ebaed69ec5165d7ee338a2fa57cf405ca5cb7082971509620 |