Skip to main content

一个 爬虫框架

Project description

Palp 文档

简介

Palp 是一个爬虫框架
整体使用方式和 scrapy 类似,但有以下特点

  • 同一个项目可以存放多个不同的 spider,spider 拥有各自的 settings
  • 无感分布式,不需要内网,只需要 redis,分布式与非分布式仅继承的类不同
  • 自动 cookiejar 仅需要使用 keep_cookie 即可
  • 自带 requests、httpx 两种请求器,并可自定义请求器(同时需要自定义解析器)
  • 自动 join url

但有以下注意点:

  • 默认不对 item、request 进行去重
  • 去重一般为普通去重(默认),也有严格去重(需开启),严格去重时,会有锁、分布式锁

安装

pip install palp

创建项目与爬虫

创建项目和爬虫都会去判断文件是否存在
创建爬虫时,会自动移动到 spiders 文件夹下(查找为 3 个深度)

palp create -p xxx
palp create -s xxx 1

spider

目前提供 2 个 spider

  • LocalSpider:本地爬虫
  • DistributiveSpider:分布式爬虫

LocalSpider

本地爬虫,不支持分布式

创建

【命令】

palp create -s baidu 1

【结构】

"""
Create on 2022-12-12 16:21:10.159221
----------
@summary:
----------
@author:
"""
import palp


class BaiduSpider(palp.LocalSpider):
    spider_name = "baidu"   # 自定义的名字
    spider_domains = []  # 允许通过的域名,默认不限制
    spider_settings = None  # 字典形式或导入形式的设置

    def start_requests(self) -> None:
        """
        起始函数

        :return:
        """
        for i in range(10):
            yield palp.RequestGet("https://www.baidu.com")

    def parse(self, request, response) -> None:
        """
        解析函数

        :param request:
        :param response:
        :return:
        """
        print(response)


if __name__ == '__main__':
    BaiduSpider(thread_count=1).start()

DistributiveSpider

分布式爬虫,需要在设置中增加 redis 连接 【命令】

palp create -s baidu 2

【结构】
和普通的一毛一样,只是继承类不同,无成本切换

"""
Create on 2022-12-12 16:21:10.159221
----------
@summary:
----------
@author:
"""
import palp


class BaiduSpider(palp.DistributiveSpider):
    spider_name = "baidu"   # 自定义的名字
    spider_domains = []  # 允许通过的域名,默认不限制
    spider_settings = None  # 字典形式或导入形式的设置

    def start_requests(self) -> None:
        """
        起始函数

        :return:
        """
        for i in range(10):
            yield palp.RequestGet("https://www.baidu.com")

    def parse(self, request, response) -> None:
        """
        解析函数

        :param request:
        :param response:
        :return:
        """
        print(response)


if __name__ == '__main__':
    BaiduSpider(thread_count=1).start()

数据库连接

通过 quickdb 模块,内置了 redis、mongo、mysql、kafka、postgresql 的连接,引用方式如下:

from palp import conn
conn.redis_conn # 接原始命令
conn.pg_conn    # 基于 sqlalchemy 魔改版本
conn.mysql_conn # 基于 sqlalchemy 魔改版本
conn.mongo_conn # 基于 mongo_conn 添加了部分方法,如 iter
conn.kafka_conn # kafka_conn.send 即可

注意:内置了 SpiderRecycleMiddleware 中间件,创建的连接会自动关闭

具体的使用可以看 quickdb 模块,这里强烈推荐 quickdb 针对 sqlalchemy 的数据库模板导出功能:

from quickdb import PostgreSQLAlchemyEngine

pg_conn = PostgreSQLAlchemyEngine(
    host=settings.PG_HOST,
    port=settings.PG_PORT,
    db=settings.PG_DB,
    user=settings.PG_USER,
    pwd=settings.PG_PWD,
    **settings.PG_CONFIG
)
pg_conn.reverse_table_model(path='./models.py', tables=['xxx'])  # path 可以自动生成

Middleware

SpiderMiddleware

这是项目中间件,可通过 spider 访问到对应的属性

  • spider_start:爬虫启动时干什么
  • spider_error:爬虫出错时干什么
  • spider_close:爬虫结束时干什么(必定会执行)

【结构】

import palp
from loguru import logger


class SpiderMiddleware(palp.SpiderMiddleware):
    def spider_start(self, spider) -> None:
        """
        spider 开始时的操作

        :param spider:
        :return:
        """

    def spider_error(self, spider, exception: Exception) -> None:
        """
        spider 出错时的操作

        :param spider:
        :param exception: 错误的详细信息
        :return:
        """
        logger.exception(exception)

    def spider_close(self, spider) -> None:
        """
        spider 结束的操作

        :param spider:
        :return:
        """

【添加到设置中】 注意:这里的 1 代表顺序

SPIDER_MIDDLEWARE = {
    1: 'middlewares.middleware.SpiderMiddleware',
}

RequestMiddleware

请求中间件

  • request_in:请求创建时干什么
  • request_error:请求出错时干什么(默认重试 3 次 settings.REQUEST_RETRY_TIMES)
  • request_failed:请求失败时干什么(3 次后还失败,走这里)
  • request_close:请求结束时干什么(执行完毕后走这里)

注意:

  • request_failed 和 request_close 只有走其中一个
  • 不要的请求可直接抛出 DropRequestException 错误
  • 请求可以原地修改
  • request_error 可以判断错误进行修改,或者直接 return 新的请求,旧请求自动丢弃
  • request_close 可以判断响应是否符合预期,或者直接 return 新的请求,旧请求自动丢弃

提示:分布式爬虫时,设置开启以下两个选项,即可自动保存错误请求,并存放 redis,下次请求自动继续

  • REQUEST_FAILED_SAVE = True # 分布式时保存失败的请求(重试之后仍然失败的)
  • REQUEST_RETRY_FAILED = True # 分布式时启动重试失败请求

【结构】

import palp
from typing import Union
from palp import settings
from loguru import logger
from palp.network.request import Request

class RequestMiddleware(palp.RequestMiddleware):
    def request_in(self, spider, request) -> None:
        """
        请求进入时的操作

        :param spider:
        :param request:
        :return:
        """
        if settings.REQUEST_PROXIES_TUNNEL_URL:
            request.proxies = {
                'http': settings.REQUEST_PROXIES_TUNNEL_URL,
                'https': settings.REQUEST_PROXIES_TUNNEL_URL,
            }

    def request_error(self, spider, request, exception: Exception) -> Union[Request, None]:
        """
        请求出错时的操作

        :param spider:
        :param request: 该参数可返回(用于放弃当前请求,并发起新请求)
        :param exception: 错误的详细信息
        :return: [Request, None]
        """
        logger.exception(exception)

        return

    def request_failed(self, spider, request) -> None:
        """
        超过最大重试次数时的操作

        :param spider:
        :param request:
        :return:
        """
        logger.warning(f"失败的请求:{request}")

    def request_close(self, spider, request, response) -> Union[Request, None]:
        """
        请求结束时的操作

        :param spider:
        :param request: 该参数可返回(用于放弃当前请求,并发起新请求)
        :param response:
        :return: [Request, None]
        """
        return

【添加到设置中】 注意:这里的 1 代表顺序

REQUEST_MIDDLEWARE = {
    1: "middlewares.middleware.RequestMiddleware",
}

添加代理

注意:这里基于默认的 ResponseDownloaderByRequests 请求器(requests)

class RequestMiddleware(palp.RequestMiddleware):
    def request_in(self, spider, request) -> None:
        """
        请求进入时的操作

        :param spider:
        :param request:
        :return:
        """
        
        # 给所有 url 都添加代理
        if settings.REQUEST_PROXIES_TUNNEL_URL:
            request.proxies = {
                'http': settings.REQUEST_PROXIES_TUNNEL_URL,
                'https': settings.REQUEST_PROXIES_TUNNEL_URL,
            }
           
        # 指定域名添加代理
        allow_domains = ['xxx']
        
        if settings.REQUEST_PROXIES_TUNNEL_URL:
            if request.domain in allow_domains:
                request.proxies = {
                    'http': settings.REQUEST_PROXIES_TUNNEL_URL,
                    'https': settings.REQUEST_PROXIES_TUNNEL_URL,
                }

Pipeline

处理 item,清洗、入库,含有以下方法

  • pipeline_in:数据进入时,一般用作清洗
  • pipeline_save:数据保存方法,一种保存写一个清晰明了
  • pipeline_error:数据保存出错时,默认重试次数 3 ,可通过 settings.PIPELINE_RETRY_TIMES 设置
  • pipeline_failed:数据保存出错超过最大次数
  • pipeline_close:无数据时,整个 pipeline 结束时运行!!!

注意:

  • 数据都是原地修改的,不需要传递
  • 默认启动的数据处理线程是 5,可通过 settings.ITEM_THREADS 调整
  • 默认是单条传递,有多条传递需求的可通过 PIPELINE_ITEM_BUFFER 调整,这样传递 item 的就是列表
  • 不需要的 item 可以通过 DropItemException 进行丢弃,但如果是多条直接在 item 列表移除就行,丢弃的话整个都会被丢掉

【结构】

import palp
from loguru import logger


class Pipeline(palp.Pipeline):
    def pipeline_in(self, spider, item) -> None:
        """
        入库之前的操作,一般是清洗

        :param spider:
        :param item:
        :return:
        """

    def pipeline_save(self, spider, item) -> None:
        """
        入库

        :param spider:
        :param item: 启用 item_buffer 将会是 List[item] 反之为 item
        :return:
        """
        logger.info(item)

    def pipeline_error(self, spider, item, exception: Exception) -> None:
        """
        入库出错时的操作

        :param spider:
        :param item: 启用 item buffer 时是 List[item]
        :param exception: 错误的详细信息
        :return:
        """
        logger.exception(exception)

    def pipeline_failed(self, spider, item) -> None:
        """
        超过最大重试次数时的操作

        :param spider:
        :param item: 启用 item buffer 时是 List[item]
        :return:
        """
        logger.warning(f"失败的 item:{item}")

    def pipeline_close(self, spider) -> None:
        """
        spider 结束时的操作

        :param spider:
        :return:
        """

【添加到设置中】 注意:这里的 1 代表顺序

PIPELINE = {
    1: "pipelines.pipeline.Pipeline",
}

Item

通过 yield item 将数据发送到 pipeline 进行保存
Item 提供了两种

  • Item
  • StrictItem

Item

懒人 item 不需要定义字段,但是最好有多个就写不同的名字做区分
【创建】

import palp


class Item(palp.Item):
    """
        通用、懒人 item
    """

【使用】

yield Item(**{'xxx':'yyy'})

StrictItem

严格 item,需要定义哪些字段被允许通过

import palp

class StrictItem(palp.StrictItem):
    """
        严格 item
    """
    # 此处需要定义数据库字段
    # name = palp.Field()

【使用】

yield StrictItem(**{'xxx':'yyy'})

Request

提供了以下方法

  • RequestGet
  • RequestPost
  • RequestDelete
  • RequestOptions
  • RequestHead
  • RequestPatch

除了 requests、httpx 参数常用外,还可以通过 command 指定其它参数,方便自定义请求器时使用

框架的参数如下

  • downloader:局部下载器(不需要实例化)
  • downloader_parser:局部下载解析器(不需要实例化)
  • filter_repeat:是否过滤请求(默认不过滤)
  • keep_session:是否保持 session 自动 cookie,tls 连接
  • keep_cookie:自动 cookieJar
  • new_session:创建新的 session
  • callback:回调函数(必须有)
  • cookie_jar:cookieJar(用户不需要手动操作,但是可以通过该参数获取值)
  • priority:使用优先级队列时的优先级(默认就是优先级队列,默认优先级 settings.DEFAULT_QUEUE_PRIORITY)
  • command:自定义请求器时的传参字典

注意:

  • 请求过滤默认是不开启的,需要开启 settings.FILTER_REQUEST(普通过滤)
  • 在开启普通过滤的情况下,可以选择开启 settings.STRICT_FILTER(严格过滤)加锁,严重影响性能,不推荐
  • keep_session 虽然可以提高部分效率,但不建议使用,因为既然是爬虫多线程,隧道代理,但是你只有一个连接,很容易就判断出来
  • keep_session 时,检测的话,很容易出现断连,那所有爬虫歇菜,那就需要 new_session=True 重新创建
  • keep_cookie 推荐,类似 scrapy 手动使用 cookiejar,这里只要一直 keep_cookie=True 就行
  • 请求队列默认是 优先级队列,想修改通过 settings.REQUEST_QUEUE_MODE 修改

Response

请求响应 提供了自带的以下方法

  • xpath
  • css
  • re
  • re_first
  • bs4(默认解析器:lxml)
  • urljoin:合并 url,可以手动使用(默认会自动处理)

【示例】

def parse(self, request, response) -> None:
    """
    解析函数

    :param request:
    :param response:
    :return:
    """
    response.bs4.find()
    response.re()
    response.re_first()
    response.xpath().extract()
    response.xpath().extract_first()

M、其它使用技巧

1、增量爬虫

增量无非就是判断,建议拿主键直接判断,下面有两个简单的例子

1.1 数据库判断

即通过自己保存的数据,进行判断列表页,已出现的 url 则为已抓取,那么后续则不需要抓取
【案例】

is_repeat = False   # 重复标志

for i in response.xpath('//ul[@class="list_con"]//li'):
    notice_url = response.urljoin(i.xpath('./a/@href').extract_first())
    
    # 判断是否重复
    if conn_company_notice.find_one({'notice_url': notice_url}):
        is_repeat = True
        break
    
    yield palp.RequestGet(url=notice_url, callback=self.parse_content)

# 翻页
if not is_repeat and page_now < page_total:
    pass

1.2 redis 判断

Palp 默认的分布式去重有:

  • redis set 去重
  • redis bloom 去重(默认)

对应的过滤器如下:

  • RedisSetFilter:对应 redis set 去重
  • RedisBloomFilter:对应 redis bloom 去重

使用时需开启以下设置,作用是开启去重并持久化

FILTER_REQUEST = True
PERSISTENCE_REQUEST_FILTER = True

【案例】以 redis bloom 去重 为例
注意:虽然本身会做去重请求,但是之所以这样写,是为了避免再去翻页浪费时间

from palp.filter import RequestRedisBloomFilter

is_repeat = False   # 重复标志

for i in response.xpath('//ul[@class="list_con"]//li'):
    notice_url = response.urljoin(i.xpath('./a/@href').extract_first())
    
    req = palp.RequestGet(url=notice_url, callback=self.parse_content)

    # 判断是否重复
    if RequestRedisBloomFilter().is_repeat(req):
        break
    
    yield req

# 翻页
if not is_repeat and page_now < page_total:
    pass

2、快速二次请求

基于上一次请求的基础上进行二次请求
有两种方法:

  • 原地修改
  • request 的 to_dict() 方法获取字典后修改

2.1、原地修改

def parse(self, request, response) -> None:
    request.xxx = xxx   # 修改

    yield request

2.2、to_dict()

request_dict = request.to_dict()
request_dict[xxx] = xxx # 修改

yield palp.Request(**request_dict)

3、个性化爬虫

比如批次爬虫,根据 redis 获取任务,其实很简单的(不内置批次爬取爬虫)

关于表

可以设置多种状态:已抓取,待抓取,队列中,等等
大概流程如下:

  • 不论你什么需求是,只要在 start_requests 函数中,设置获取方法
  • 随后设置为队列中状态
  • 在 pipeline 中设置修改已爬取状态

关于 redis

可以设置最简单的 list、或者不重复的 set 或者优先级的 zset
只要在 start_requests 函数中 直接执行对应的 pop 方法不就行了

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

palp-0.0.97.tar.gz (53.9 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page