Skip to main content

Integrating Scrapy with kafka using the confluent-kafka python client

Project description

Python 3.9 Python 3.10 Python 3.11

🚀 Kafka Scrapy Connect

Overview

kafka_scrapy_connect is a custom Scrapy library that integrates Scrapy with Kafka.

It consists of two main components: spiders and pipelines, which interact with Kafka for message consumption and item publishing.

The library also comes with a custom extension that publishes log stats to a kafka topic at EoD, which allows the user to analyse offline how well the spider is performing!

This project has been motivated by the great work undertaken in: https://github.com/dfdeshom/scrapy-kafka.

kafka_scrapy_connect utilises Confluent's Kafka Python client under the hood, to provide high-level producer and consumer features.

Features

1️⃣ Integration with Kafka 📈

  • Enables communication between Scrapy spiders and Kafka topics for efficient data processing.
  • Through partitions and consumer groups message processing can be parallelised across multiple spiders!
  • Reduces overhead and improves throughput by giving the user the ability to consume messages in batches.

2️⃣ Customisable Settings 🛠️

  • Provides flexibility through customisable configuration for both consumers and producers.

3️⃣ Error Handling 🚑

  • Automatically handles network errors during crawling and publishes failed URLs to a designated output topic.

4️⃣ Serialisation Customisation 🧬

  • Allows users to customize how Kafka messages are deserializsd by overriding the process_kafka_message method.

Installation

You can install kafka_scrapy_connect via pip:

pip install kafka_scrapy_connect

Example

A full example using the kafka_scrapy_connect library can be found inside the repo.

The only prerequisite for walking through the example is the installation of Docker 🐳 .

This is needed because a kafka cluster will be created locally using containers so kafka_scrapy_connect can communicate with a broker.

If all set, follow the below steps 👇

  1. Create a virtual environment, clone the repo and install requirements:
python3 -m venv .venv
source .venv/bin/activate
git clone https://github.com/spicyparrot/kafka_scrapy_connect.git && cd kafka_scrapy_connect
pip install -r requirements.txt
  1. Create a local kafka cluster with required topics:
bash ./examples/kafka/kafka_start.sh --input-topic ScrapyInput,1 --output-topic ScrapyOutput,1 --error-topic ScrapyError,1
  1. Initiate the spider:
cd examples/quotes && scrapy crawl quotes
  1. Publish a message to the input kafka topic and watch the spider consume and process the mesasge 🪄

    1. ☝️ This will require some custom producer code to publish messages or go to https://localhost:8080 and use the UI to publish some example messages in! (The UI was created when bringing up your local kafka cluster)
  2. When satisfied with testing, exit the spider and clean up the local kafka cluster:

bash ./examples/kafka/kafka_stop.sh

Usage

Custom Settings

kafka_scrapy_connect supports the following custom settings:

  • SCRAPY_KAFKA_HOSTS - A list of kafka broker hosts. (Default: localhost:29092)
  • SCRAPY_INPUT_TOPIC - Topic from which the spider[s] consumes messages from. (Default: ScrapyInput)
  • SCRAPY_OUTPUT_TOPIC - Topic where scraped items are published. (Default: ScrapyOutput)
  • SCRAPY_ERROR_TOPIC - Topic for publishing URLs that failed due to network errors. (Default: ScrapyError)
  • SCRAPY_CONSUMER_CONFIG - Additional configuration options for Kafka consumers (see here)
  • SCRAPY_PRODUCER_CONFIG - Additional configuration options for Kafka producers (see here)
  • SCRAPY_KAFKA_PRODUCER - Key used for partitioning messages in Kafka producer (Default: "" Roundrobin)
  • SCRAPY_KAFKA_PRODUCER_CALLBACKS - Enable or disable asynchronous message delivery callbacks. (Default: False)

Customisation


Customising deserialisation

You can customize how Kafka messages are deserialized by overriding the process_kafka_message method in your spider class.

This allows for handling custom message formats or data transformations.

class CustomSpider(KafkaListeningSpider):
    def process_kafka_message(self, message, meta={}, headers={}):
        # Custom deserialization logic
        # Return URL, metadata or None if extraction fails
        pass

⚠️ By default, if no custom process_kafka_message method is provided, the spider method process_kafka_message will expect a JSON payload or a string containing a valid url. If it's a JSON object, it expects url in the K/V pair.


Customising Producer & Consumer settings

You can customize producer and consumer settings by providing a dictionary of configuration options in your Scrapy settings under SCRAPY_PRODUCER_CONFIG and SCRAPY_CONSUMER_CONFIG.

# Example SCRAPY_PRODUCER_CONFIG
SCRAPY_PRODUCER_CONFIG = {
    'compression.type': 'gzip',
    'request.timeout.ms': 5000
}

# Example SCRAPY_CONSUMER_CONFIG
SCRAPY_CONSUMER_CONFIG = {
    'fetch.wait.max.ms': 10,
    'max.poll.interval.ms': 600000,
    'auto.offset.reset': 'latest'
}

Custom stats extensions

kafka_scrapy_connect comes with a custom Scrapy stats extension that:

  1. logs basic scraping statistics every minute (frequency can be configured by the scrapy setting KAFKA_LOGSTATS_INTERVAL)
  2. At end-of-day, will publish logging statistics to a Kafka topic (specified by the scrapy setting SCRAPY_STATS_TOPIC).
    1. Each summary message will be published with a key specifying the summary date (2024-02-27) for easy identification.
  3. If the spider is shutdown or closed, due to a deployment etc, a summary payload will also be sent to a kafka topic (SCRAPY_STATS_TOPIC)

To enable this custom extension, disable the standard LogStats extension and modify your settings.py to include the below:

# Kafka topic for capturing stats
SCRAPY_STATS_TOPIC = 'ScrapyStats'

# Disable standard logging extension (use custom kafka_scrapy_connect extension)
EXTENSIONS = {
  "scrapy.extensions.logstats.LogStats": None,
  "kafka_scrapy_connect.extensions.KafkaLogStats": 500
}

An example payload sent to the statistics topic will look like:

{
	"pages_crawled": 3,
	"items_scraped": 30,
	"avg pages/min": 0.4,
	"avg pages/hour": 23.78,
	"avg pages/day": 570.63,
	"avg items/min": 3.96,
	"avg items/hour": 237.76,
	"avg items/day": 5706.3,
	"successful_request_pct": 100.0,
	"http_status_counts": "200: 3",
	"max_memory": 76136448,
	"elapsed_time": 454.23
}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafka-scrapy-connect-2.5.0.tar.gz (12.7 kB view details)

Uploaded Source

Built Distribution

kafka_scrapy_connect-2.5.0-py3-none-any.whl (11.7 kB view details)

Uploaded Python 3

File details

Details for the file kafka-scrapy-connect-2.5.0.tar.gz.

File metadata

  • Download URL: kafka-scrapy-connect-2.5.0.tar.gz
  • Upload date:
  • Size: 12.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.2 CPython/3.11.8

File hashes

Hashes for kafka-scrapy-connect-2.5.0.tar.gz
Algorithm Hash digest
SHA256 56f02963f0318b3495a9b31d497f16163d1b3b736cfac8a24b0c3282900418fb
MD5 c4c58a07873b912c45961f25bb9c8386
BLAKE2b-256 7db33755d850ddf6c7330d6bf881b7c37f40a15a32d1f50594c399afdd1f9634

See more details on using hashes here.

File details

Details for the file kafka_scrapy_connect-2.5.0-py3-none-any.whl.

File metadata

File hashes

Hashes for kafka_scrapy_connect-2.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 48e9747dca0bba70c9e3776d0967ceabefb24ea04cd451e4db417ad59c759c20
MD5 6e73b06b3a402996c9fc08df1aaac332
BLAKE2b-256 99c1c9753b87c16f5ea74bb19bc1a589e52dbd0d960927526d12a766bd388270

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page