Rabbitmq for Distributed scraping
Project description
Scrapy分布式RabbitMQ调度器
安装
使用pip安装
pip install scrapyer-rabbitmq-scheduler
版本:
2022/09/06 更新 1.1.2
- 兼容调度器
scrapy_rabbitmq_scheduler.scheduler.MqMixCache
与中间件RabbitMQMiddleware
- 说明: 由于
MqMixCache
采用双列队, response异常情况将不重新入MQ列队,RabbitMQMiddleware
将自动失效
- 说明: 由于
2022/09/02 更新 1.1.0
- 新增调度器
scrapy_rabbitmq_scheduler.scheduler.MqMixCache
MQ列队组合内存列队,将原先SaaS调度器的广度优先调整为深度优先
2021/11/25 更新 1.0.9
- 解决pipelines管道文件链接超时问题
使用
第一步: 在你的项目中的settings.py中添加配置项
# 指定项目的调度器
SCHEDULER = "scrapy_rabbitmq_scheduler.scheduler.MqMixCache"
# 指定rabbitmq的连接DSN
RABBITMQ_CONNECTION_PARAMETERS = 'amqp://guest:guest@localhost:5672/?heartbeat=0'
# 指定重试的http状态码(重新加回队列重试)
SCHEDULER_REQUEUE_ON_STATUS = [500,503]
# 指定下载器中间件, 确认任务是否成功
DOWNLOADER_MIDDLEWARES = {
'scrapy_rabbitmq_scheduler.middleware.RabbitMQMiddleware': 999
}
# 指定item处理方式, item会加入到rabbitmq中
ITEM_PIPELINES = {
'scrapy_rabbitmq_scheduler.pipelines.RabbitMQPipeline': 300,
}
第二步: 修改Spider的继承类
import scrapy
from scrapy_rabbitmq_scheduler.spiders import RabbitSpider
class CustomSpider(RabbitSpider):
name = 'custom_spider'
queue_name = 'test_urls' # 指定任务队列的名称
items_key = 'test_item' # 指定item队列名称
def parse(self, response):
item = ... # parse item
yield item
第三步: 将任务写入到RabbitMQ队列
#!/usr/bin/env python
import pika
import settings
connection = pika.BlockingConnection(pika.URLParameters(settings.RABBITMQ_CONNECTION_PARAMETERS))
channel = connection.channel()
queue_key = 'test_urls'
# 读取文件中的链接并写入到队列中
with open('urls.txt') as f:
for url in f:
url = url.strip(' \n\r')
channel.basic_publish(exchange='',
routing_key=queue_key,
body=url,
properties=pika.BasicProperties(
content_type='text/plain',
delivery_mode=2
))
connection.close()
urls.txt
http://www.baidu.com
高级特色
1. 支持消息优先级
- 消息优先级的范围为0~255, 数字越大, 优先级越高
yield scrapy.Request(url, priority=优先级)
则可以直接指定消息的优先级
2. 队列持久化
# settings.py
RABBITMQ_DURABLE = True # 是否持久化队列, True为持久化 False为非持久化, 默认True
3. 消息确认
# settings.py
RABBITMQ_CONFIRM_DELIVERY = True # 消息是否需要确认, True为需要, False为不需要, 默认是True
4. 增加消息延时
scrapy-rabbitmq-scheduler的消息延时是使用rabbitmq-delayed-message-exchange
插件实现的, 所以在使用之前需要先安装以及开启这个插件
rabbitmq-delayed-message-exchange
: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
在spider中开启延时队列
# -*- coding: utf-8 -*-
import scrapy
from scrapy_rabbitmq_scheduler.spiders import RabbitSpider
from example.items import ArticleItem
class CcidcomSpider(RabbitSpider):
....
# 队列名称
queue_name = 'ccidcom'
# 是否是延迟队列
is_delay_queue = True
...
is_delay_queue
设置为True,则自动会开启延时
使用延时
yield scrapy.Request('http://www.ccidcom.com/', callback=self.parse, meta={'_delay_time': 10000})
在meta中增加_delay_time
, 指定延时毫秒数, 则自动生效
TODO
- 支持延时请求
- 增加任务持久化配置
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file scrapyer-rabbitmq-scheduler-1.1.2.tar.gz
.
File metadata
- Download URL: scrapyer-rabbitmq-scheduler-1.1.2.tar.gz
- Upload date:
- Size: 11.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.9.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9c11abf6abf48bc11671b38277db6b2171d10bf3be162e183723dc0e260aee26 |
|
MD5 | 89f568a0428a4c495391dbe08976f479 |
|
BLAKE2b-256 | 44e6147e083982cd7a411de4ada1de3cb78fb3d048004c2e309301048bbf409b |