Skip to main content

Rabbitmq for Distributed scraping

Project description

Scrapy分布式RabbitMQ调度器

安装

使用pip安装

pip install scrapyer-rabbitmq-scheduler

版本:

2022/09/06 更新 1.1.2

  • 兼容调度器scrapy_rabbitmq_scheduler.scheduler.MqMixCache与中间件RabbitMQMiddleware
    • 说明: 由于MqMixCache采用双列队, response异常情况将不重新入MQ列队,RabbitMQMiddleware将自动失效

2022/09/02 更新 1.1.0

  • 新增调度器 scrapy_rabbitmq_scheduler.scheduler.MqMixCache MQ列队组合内存列队,将原先SaaS调度器的广度优先调整为深度优先

2021/11/25 更新 1.0.9

  • 解决pipelines管道文件链接超时问题

使用

第一步: 在你的项目中的settings.py中添加配置项

# 指定项目的调度器
SCHEDULER = "scrapy_rabbitmq_scheduler.scheduler.MqMixCache"

# 指定rabbitmq的连接DSN
RABBITMQ_CONNECTION_PARAMETERS = 'amqp://guest:guest@localhost:5672/?heartbeat=0'

# 指定重试的http状态码(重新加回队列重试)
SCHEDULER_REQUEUE_ON_STATUS = [500,503]

# 指定下载器中间件, 确认任务是否成功
DOWNLOADER_MIDDLEWARES = {
    'scrapy_rabbitmq_scheduler.middleware.RabbitMQMiddleware': 999
}
# 指定item处理方式, item会加入到rabbitmq中
ITEM_PIPELINES = {
    'scrapy_rabbitmq_scheduler.pipelines.RabbitMQPipeline': 300,
}

第二步: 修改Spider的继承类

import scrapy
from scrapy_rabbitmq_scheduler.spiders import RabbitSpider

class CustomSpider(RabbitSpider):
    name = 'custom_spider'    
    queue_name = 'test_urls' # 指定任务队列的名称
    items_key = 'test_item' # 指定item队列名称

    def parse(self, response):
        item = ... # parse item
        yield item

第三步: 将任务写入到RabbitMQ队列

#!/usr/bin/env python
import pika
import settings

connection = pika.BlockingConnection(pika.URLParameters(settings.RABBITMQ_CONNECTION_PARAMETERS))
channel = connection.channel()

queue_key = 'test_urls'

# 读取文件中的链接并写入到队列中
with open('urls.txt') as f:
    for url in f:
        url = url.strip(' \n\r')
        channel.basic_publish(exchange='',
                        routing_key=queue_key,
                        body=url,
                        properties=pika.BasicProperties(
                            content_type='text/plain',
                            delivery_mode=2
                        ))

connection.close()

urls.txt

http://www.baidu.com

高级特色

1. 支持消息优先级

  1. 消息优先级的范围为0~255, 数字越大, 优先级越高
yield scrapy.Request(url, priority=优先级)

则可以直接指定消息的优先级

2. 队列持久化

# settings.py
RABBITMQ_DURABLE = True # 是否持久化队列, True为持久化 False为非持久化, 默认True

3. 消息确认

# settings.py
RABBITMQ_CONFIRM_DELIVERY = True # 消息是否需要确认, True为需要, False为不需要, 默认是True

4. 增加消息延时

scrapy-rabbitmq-scheduler的消息延时是使用rabbitmq-delayed-message-exchange插件实现的, 所以在使用之前需要先安装以及开启这个插件 rabbitmq-delayed-message-exchange: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

在spider中开启延时队列

# -*- coding: utf-8 -*-
import scrapy
from scrapy_rabbitmq_scheduler.spiders import RabbitSpider
from example.items import ArticleItem


class CcidcomSpider(RabbitSpider):
    ....
    # 队列名称
    queue_name = 'ccidcom'
    # 是否是延迟队列
    is_delay_queue = True
    ...

is_delay_queue设置为True,则自动会开启延时

使用延时

yield scrapy.Request('http://www.ccidcom.com/', callback=self.parse, meta={'_delay_time': 10000})

在meta中增加_delay_time, 指定延时毫秒数, 则自动生效

TODO

  • 支持延时请求
  • 增加任务持久化配置

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

scrapyer-rabbitmq-scheduler-1.1.2.tar.gz (11.6 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page