资讯详情

14、pyspider框架

一、pyspider框架介绍

1.简介

pyspider 这是一个强大的原因python爬虫系统的实现。

  • 纯python的
  • 强大的webui,支持脚本编辑、任务监控、项目管理和结果检查
  • 后台数据支持,MySQL,MongoDB,Reids,SQLite,Elasticsearch,PostgreSQL和SQLAlchemy
  • 消息队列支持,RabbitMQ,Beanstalk,Redis以及Kombu
  • 支持任务优先、定期、失败重试等调度方案
  • 分布式架构,抓取js页面
  • 支持Python2和3

2.安装

‘pip install pyspider’

ubuntu

如果使用ubuntu,请先运行sudo apt update 再运行sudo apt upgrade 更新

apt-get install python python-dev python-distribute python-pip \ libcurl4-openssl-dev libxml2-dev libxslt1-dev python-lxml \ libssl-dev zlib1g-dev 

删除wsgidav 然后重新安装2.4.1版本

windows

1.下载 pyspider

方式一:pip install pyspider

方法二(建议):pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspider

2.运行 pyspider 操作报告如下错误:

 Deprecated option 'domaincontroller': use 'http_authenticator.domain_controller' instead. 

解决方案:删除wsgidav 然后重新安装2.4.1版本

(1)where wsgidav找到wsgidav删除位置wsgidav.exe

(2)pip install wsgidav==2.4.1 安装2.4.1版本

3.再次运行 pyspider 报错

cannot import name 'DispatcherMiddleware'

解决方案:

#卸载 pip uninstall werkzeug #安装指定版本 pip install werkzeug==0.16.0

4.再次运行 pyspider 运行成功

二、pyspider框架入门

1.启动pyspider

安装好pyspider之后,创建项目文件夹存储相关文件,进入文件夹后运行pyspider默认情况下,命令将运行web通过5000端口监控服务端,http://localhost:5000即可访问pyspider的web管理界面,它看起来是这样的:

2.创建一个项目

点击右边的Create按钮,在弹出框里,填写项目名称,和起始url

创建完成后,窗口右侧为代码编辑器,您可以在这里编写爬虫脚本。

3.第一个脚本

from pyspider.libs.base_handler import *  class Handler(BaseHandler):     crawl_config = { 
             }      @every(minutes=24 * 60)     def on_start(self):         self.crawl('http://scrapy.org/', callback=self.index_page)      @config(age=10 * 24 * 60 * 60)     def index_page(self, response):         for each in response.doc('a[href^="http"]').items(): self.crawl(each.attr.href, callback=self.detail_page) @config(priority=2) def detail_page(self, response): return { 
          "url": response.url, "title": response.doc('title').text(), } 
  • def on_start(selef)是脚本的入口。当你点击run按钮时,它会被调用。
  • self.crawl(url, callback=self.index_page)是最重要的接口。它会添加一个新的待爬取任务。大部分的设置可以通过self.crawl的参数去指定。
  • def index_page(self, response)接收一个response对象。response.doc是一个pyquery对象,它有一个类似jQuery选择器一样的接口,去解析页面元素。
  • def detail_page(self, response)返回一个字典结果。这个结果默认会被写入resultdb(结果数据库)。你可以通过复写on_result(self, result)方法来按照你自己的需求处理结果。
  • @every(minutes=24 * 60)这个装饰器会告诉调度器,on_start方法将会每天被调用。
  • @config(age=10 * 24 * 60 * 60)指定当self.crawl爬取的页面类型为index_page(当callback=self.index_page)时的age参数的默认值。参数age可以通过self.crawl(url, age=10*24*60*60)crawl_config来指定,直接在方法参数中指定具有最高的优先级。
  • age=10*24*60*60告诉调度器抛弃10天内爬取过的请求。默认情况下,相同URL不会被爬取两次,甚至你修改了代码。对于初学者来说,第一次运行项目然后修改它,在第二次运行项目的情况非常常见,但是它不会再次爬行(阅读itag了解解决方案)
  • @config(priority=2)标志着,detail page将会被优先爬取。

你可以通过点击绿色的run按钮,一步一步的调试你的脚本。切换到follows面板,点击play按钮前进。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EYzmCHn1-1614400262039)(images/run_one_step.png)]

4.运行项目

  1. 保存脚本

  2. 返回后台首页,找到你的项目

  3. 改变statusDEBUGRUNNING

  4. 点击按钮run

三、架构

1.概述

下图显示了pyspider体系结构及其组件的概述,以及系统内部发生的数据流的概要。

组件之间通过消息队列进行连接。每一个组件都包含消息队列,都在它们自己的进程/线程中运行,并且是可以替换的。这意味者,当处理速度缓慢时,可以通过启动多个processor实例来充分利用多核cpu,或者进行分布式部署。

2.组件

Scheduler

调度器从processor返回的新任务队列中接收任务。判断是新任务还是需要重新爬取。通过优先级对任务进行分类,并且利用令牌桶算法将任务发送给fetcher. 处理周期任务,丢失的任务和失败的任务,并且稍后重试。

以上都可以通过self.crawl进行设置。

注意,在当前的调度器实现中,只允许一个调度器。

Fetcher

Fetcher的职责是获取web页面然后把结果发送给processor。请求method, headers, cookies, proxy, etag 等,都可以设置。

Processor

处理器的职责是运行用户编写的脚本,去解析和提取信息。您的脚本在无限制的环境中运行。尽管我们有各种各样的工具(如风PyQuery)供您提取信息和连接,您可以使用任何您想使用的方法来处理响应。

处理器会捕捉异常和记录日志,发送状态(任务跟踪)和新的任务给调度器,发送结果给Result Worker

Result Worker

Result WorkerPorcess接收结果。Pyspider有一个内置的结果处理器将数据保存到resultdb.根据您的需要重写它以处理结果。

WebUI

WebUI是一个面向所有内容的web前端。它包含:

  • 脚本编辑器,调试器
  • 项目管理器
  • 任务监控程序
  • 结果查看器和导出

也许webui是pyspider最吸引人的地方。使用这个强大的UI,您可以像pyspider一样一步一步地调试脚本。启动停止项目。找到哪个项目出错了,什么请求失败了,然后使用调试器再试一次。

Data flow

pyspider中的数据流如上图所示:

  1. 当您按下WebUI上的Run按钮时,每个脚本都有一个名为on_start的回调。作为项目的入口,on_start产生的新任务将会提交给调度器。
  2. 调度程序使用一个数据URI将这个on_start任务分派为要获取的普通任务。
  3. Fetcher对它发出一个请求和一个响应(对于数据URI,它是一个假的请求和响应,但与其他正常任务没有区别),然后送给处理器。
  4. 处理器调用on_start方法并生成一些要抓取的新URL。处理器向调度程序发送一条消息,说明此任务已完成,并通过消息队列将新任务发送给调度程序(在大多数情况下,这里没有on_start的结果。如果有结果,处理器将它们发送到result_queue)。
  5. 调度程序接收新任务,在数据库中查找,确定任务是新的还是需要重新抓取,如果是,将它们放入任务队列。按顺序分派任务。
  6. 这个过程重复(从步骤3开始),直到WWW死后才停止;-)。调度程序将检查定期任务,以抓取最新数据。

3.关于任务

任务是调度的基本单元。

基本原理
  • 任务由它的taskid(默认:md5(url), 可以通过重写get_taskid(self, task)方法来修改)来区分。
  • 不同项目之间的任务是隔离的
  • 一个任务有4个状态:
    • active
    • failed
    • success
    • bad-not used
  • 只有处于active状态的任务会被调度
  • 任务按优先次序执行。
调度
新任务

当一个新的任务(从未见过)出现:

  • 如果设置了exetime但没有到达,它将被放入一个基于时间的队列中等待。
  • 否则将被接受。

当任务已经在队列中:

  • 除非(force_update)强制更新,否则忽略

当一个完成过的任务出现时:

  • 如果age设置,且last_crawl_time + age < now 它将会被接受,否则抛弃。
  • 如果itag设置,且它不等于上一次的值,它会被接受,否则抛弃。
重试

当请求错误或脚本错误发生时,任务将在默认情况下重试3次。

第一次重试将在30秒、1小时、6小时、12小时后每次执行,任何更多的重试将推迟24小时。retry_delay是一个指定重试间隔的字典。这个字典中的元素是{retried:seconds}, 如果未指定,则使用特殊键:''空字符串指定的默认推迟时间。

例如默认的retry_delay声明如下:

class MyHandler(BaseHandler):
    retry_delay = {
        0: 30,
        1: 1*60*60,
        2: 6*60*60,
        3: 12*60*60,
        '': 24*60*60
    }

4.关于项目

在大多数情况下,项目是为一个网站编写的一个脚本。

  • 项目是独立的,但是可以用from Projects import other_project将另一个项目作为模块导入
  • 一个项目有5个状态:TODO,STOP,CHECKING,DEBUGRUNNING
    • TODO - 刚创建,正在编写脚本
    • 如果您希望项目停止(= =),可以将其标记为STOP。
    • CHECKING - 当正在运行的项目被修改时,为了防止未完成的修改,项目状态将被设置为自动检查。
    • DEBUG/RUNNING - 这两种状态对爬虫没有区别。但最好在第一次运行时将其标记为DEBUG,然后在检查后将其更改为RUNNING
  • 爬行速率由rateburst并采用令牌桶算法进行控制。
    • rate - 一秒钟内有多少请求
    • burst - 考虑这种情况,RATE/BURST=0.1/3,这意味着蜘蛛每10秒抓取1个页面。所有任务都已完成,Project将每分钟检查最后更新的项目。假设找到3个新项目,pyspider将“爆发”并爬行3个任务,而不等待3*10秒。但是,第四个任务需要等待10秒。
  • 若要删除项目,请将“组”设置为“删除”,将“状态”设置为“停止”,然后等待24小时。
回调on_finished

您可以在项目中重写on_finished方法,当task_queue变为0时将触发该方法。

第一种情况:当您启动一个项目来抓取一个包含100个页面的网站时,当100个页面被成功抓取或重试失败时,on_finished回调将被触发。

第二种情况:带有auto_recrawl任务的项目永远不会触发on_finished回调,因为当其中有auto_recrawl任务时,时间队列永远不会变为0。

第三种情况:带有@every修饰方法的项目将在每次新提交的任务完成时触发on_finished回调。

5. 脚本环境

变量
  • self.project_name
  • self.project 当前项目的详细信息
  • self.response
  • self.task

官方文档:http://docs.pyspider.org/en/latest/

关于脚本
  • handler的名称并不重要,但您需要至少一个继承basehandler的类
  • 可以设置第三个参数来获取任务对象:def callback(self、response、task)
  • 默认情况下,非200响应不会提交回调。可以使用@catch_status_code_error来处理非200响应
关于环境
  • logging,print以及异常会被捕获。
  • 你可以通过from projects import some_project将其他项目当做模块导入。
Web view
  • 以浏览器呈现的方式查看页面(大约)
HTML view
  • 查看当前回调的HTML(索引页、细节页等)
Follows view
  • 查看可从当前回调进行的回调
  • 索引页面跟随视图将显示可执行的详细页面回调。
Messages view
  • 显示self.send_message发送的消息
Enable CSS Selector Helper
  • 启用Web视图的CSS选择器帮助程序。它获取您单击的元素的CSS选择器,然后将其添加到脚本中。

6.处理结果

从WebUI下载和查看数据很方便,但可能不适用于计算机。

使用resultdb

虽然resultdb仅用于结果预览,但不适用于大规模存储。

但是,如果您想从resultdb中获取数据,那么有一些使用数据库API的简单案例可以帮助您连接和选择数据。

from pyspider.database import connect_database
resultdb = connect_database("<your resutldb connection url>")
for project in resultdb.projects:
    for result in resultdb.select(project):
        assert result['taskid']
        assert result['url']
        assert result['result']

result['result']是由你编写的脚本中的RETURN语句提交的对象。

使用ResultWorker

在生产环境中,你可能希望将pyspider连接到你的系统的处理管道,而不是将结果存储到resultdb中。强烈建议重写ResultWorker。

from pyspider.result import ResultWorker

class MyResultWorker(ResultWorker):
    def on_result(self, task, result):
        assert task['taskid']
        assert task['project']
        assert task['url']
        assert result
        # your processing code goes here

result是你脚本中return语句提交的对象。

您可以将这个脚本(例如,my_result_worker.py)放在启动pyspider的文件夹中。

result_worker子命令添加参数:

pyspider result_worker --result-cls=my_result_worker.MyResultWorker

或者,如果你使用配置文件

{
  ...
  "result_worker": {
    "result_cls": "my_result_worker.MyResultWorker"
  }
  ...
}

为了兼容性,将存储在数据库中的结果编码为JSON。强烈建议您设计自己的数据库,并覆盖上面描述的ResultWorker。

关于结果的小技巧
想要在回调中返回多个结果?

resultdb会通过taskid(url)对结果进行去重,后面的结果会覆盖前面的。

一个解决方案是使用send_message API为每个结果生成一个伪taskid。

def detail_page(self, response):
    for li in response.doc('li').items():
        self.send_message(self.project_name, { 
        
            ...
        }, url=response.url+"#"+li('a.product-sku').text())

def on_message(self, project, msg):
    return msg

四、V2EX网站

创意工作者的社区。讨论编程、设计、硬件、游戏等令人激动的话题。 网址首页:https://www.v2ex.com/

爬取内容

主页:https://www.v2ex.com/go/python

代码实现

  1. pyspider操作的创建运行调试操作

  2. 目标数据提取逻辑

  3. mysql的存储数据

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Project: V2EX

from pyspider.libs.base_handler import *
import  pymysql
import  logging
logger=logging.getLogger(__name__)
class Handler(BaseHandler):
    crawl_config = { 
        
    }
    def __init__(self):
        self.conn=pymysql.connect(host="localhost",user="root",password="123789",database="v2ex",charset="utf8")
        self.cursor=self.conn.cursor()
        self.page_num=1
    
    @every(minutes=24 * 60)
    def on_start(self):
        print("执行on_start")
        self.crawl('https://www.v2ex.com/go/python', callback=self.index_page, validate_cert=False)

    @config(age=10 * 24 * 60 * 60)
    def index_page(self, response):
        print("执行index_page", response)
        topic_list = response.etree.xpath('//a[@class="topic-link"]/@href')
        print(topic_list)
        for topic_link in topic_list:
            # 完整路径的拼接
            topic_link = "https://www.v2ex.com" + topic_link
            print(topic_link)
            self.crawl(topic_link, callback=self.detail_page, validate_cert=False)

        #请求下一页
        self.page_num+=1
        next_page="https://www.v2ex.com/go/python?p={}".format(self.page_num)
        self.crawl(next_page, callback=self.index_page, validate_cert=False)


    @config(priority=2)
    def detail_page(self, response):
        print("执行detail_page", response)
        # 1.topic
        topic = response.etree.xpath("//h1/text()")[0]
        print("topic", topic)

        # 2.topic_content
        topic_content = response.etree.xpath('string(//div[@class="topic_content"])')
        print(topic_content)

        topic_url=response.url
        try:
            sql="insert into topic(topic,topic_content,topic_url) values(%s,%s,%s)"
            self.cursor.execute(sql,(topic,topic_content,topic_url))
            self.conn.commit()
        except Exception  as  e:
            self.conn.rollback()
            logger.warning("写入topic数据错误",e)

        # 3.reply_content
        reply_list = response.etree.xpath('//div[@class="reply_content"]')
        print("reply_list", reply_list)
        #查找topic_id
        sql="select id from topic where topic_url=%s"
        self.cursor.execute(sql, (topic_url))
        topic_id=self.cursor.fetchone()
        print("topic_id",topic_id)
        if topic_id:
            topic_id=topic_id[0]

            for reply in reply_list:
                reply_content = reply.xpath('string(.)')
                print(reply_content)
                try:
                    sql = "insert into reply_content(topic_id,reply_content,topic_url) values(%s,%s,%s)"
                    self.cursor.execute(sql, (topic_id, reply_content, topic_url))
                    self.conn.commit()
                except Exception  as  e:
                    self.conn.rollback()
                    logger.warning("写入reply_content数据错误", e)


        return { 
        
            "topic": topic,
            "topic_content": topic_content,
        }

topic数据表

reply_content数据表

五、API参考

1.self.crawl

官方文档地址:http://docs.pyspider.org/en/latest/apis/self.crawl/

url:统一资源定位符 callback:回调函数 method:请求方法 params:请求参数 data;form表单数据 age:过期时间 priority:权限,优先级 exetime:执行时间 retries:重试次数 itag:页面标识 auto_recrawl:自动重爬

user_agent:用户代理 headers:请求头 cookies:cookies connect_timeout:连接超时 timeout:响应超时 allow_redirects:允许重定向 validate_cert:证书验证 proxy:代理 fetch_type:抓取方式 save:参数传递

self.crawl(url, **kwargs)

self.crawl是告诉pyspider应该爬取哪个url的主要接口程序。

参数
url

将被爬取的url或url列表

callback

解析返回响应的方法

def on_start(self):
    self.crawl('http://scrapy.org/', callback=self.index_page)

接下来的参数可选

age

任务的有效期。在此期间,页面将被视为未修改。默认:-1(不会再次爬取)

@config(age=10 * 24 * 60 * 60)
def index_page(self, response):
    ...

每个被index_page解析的页面将被视为10天内不会发生改变。如果您在10天内提交的任务,它将被丢弃。

priority

待调度任务的优先级越高优先级越大。默认:0

def index_page(self):
    self.crawl('http://www.example.org/page2.html', callback=self.index_page)
    self.crawl('http://www.example.org/233.html', callback=self.detail_page,
               priority=1)

页面233.html将会比page2.html优先爬取。使用此参数执行广度优先算法从而减少队列中任务的数量,控制内存资源。

exetime

任务的执行时间(时间戳)。默认:0(立即运行)

import time
def on_start(self):
    self.crawl('http://www.example.org/', callback=self.callback,
               exetime=time.time()+30*60)

页面将在30分钟后执行。

retries

失败时的重试次数。默认:3

itag

来自前沿页面的标记,用于显示任务的潜在修改。它将与其最后一个值进行比较,更改后重新爬取。默认:None

def index_page(self, response):
    for item in response.doc('.item').items():
        self.crawl(item.find('a').attr.url, callback=self.detail_page,
                   itag=item.find('.update-time').text())

在上面的例子中,.update-time作为itag。如果没有更改,请求将被丢弃。

或者如果希望重新启动所有的任务,你可以通过Handler.crawl_config来设置itag,指定脚本的版本。

class Handler(BaseHandler):
    crawl_config = { 
        
        'itag': 'v223'
    }

修改脚本后更改itag的值,然后再次单击run按钮。如果之前没有设置,也没有关系。

auto_recrawl

当启用时,任务会在每个生命周期重新爬取。默认:False

def on_start(self):
    self.crawl('http://www.example.org/', callback=self.callback,
               age=5*60*60, auto_recrawl=True)

每5个小时页面会被重新爬取。

method

使用的http请求方法。默认:GET

params

要附加到URL的URL参数字典。

def on_start(self):
    self.crawl('http://httpbin.org/get', callback=self.callback,
               params={ 
        'a': 123, 'b': 'c'})
    self.crawl('http://httpbin.org/get?a=123&b=c', callback=self.callback)

这两个请求相同。

data

附加到请求的请求体。如果提供字典,则将进行表单编码。

def on_start(self):
    self.crawl('http://httpbin.org/post', callback=self.callback,
               method='POST', data={ 
        'a': 123, 'b': 'c'})
files

要上传的文件,格式为字典{field: {filename: 'content'}}

user_agent

请求的用户代理

headers

要发送的请求头字典。

cookies

要附加到请求的cookie字典

timeout

获取页面的最长时间(秒)。默认:120

allow_redirects

遵循30x重定向,默认:True

taskid

唯一标识任务的id,默认是MD5检查代码的URL,可以被def get_taskid(self, task)方法覆盖

import json
from pyspider.libs.utils import md5string
def get_taskid(self, task):
    return md5string(task['url']+json.dumps(task['fetch'].get('data', '')))

默认情况下,只有url的md5值作为taskid,上面的代码将post请求的数据添加为taskid的一部分。

save

一个传递给回调方法的对象,可以通过response.save访问

def on_start(self):
    self.crawl('http://www.example.org/', callback=self.callback,
               save={ 
        'a': 123})

def callback(self, response):
    return response.save['a']

123将在callback里返回。

2.Response

响应对象的属性。

Response.url

最终的URL

Response.text

unicode编码的响应文本。

如果Response.encoding为None,charset模块可用,内容编码将会被猜测。

Response.content

二进制格式的响应内容。

Response.doc

响应内容的PyQuery对象。

参考PyQuery的文档:https://pythonhosted.org/pyquery/

Response.etree

响应内容的lxml对象。

Response.json

响应的json编码内容(如果有的话)。

3.self.send_message

self.send_message(project, msg, [url])

发送消息给其他项目。可以被def on_message(self, project, message)接收。

  • project - 项目名
  • msg - 任何可以json序列化的对象
  • url - 如果有想同的任务结果将被覆盖。在默认情况下send_message会共享当前taskid. 更改它可以实现一个响应返回多个结果。
def detail_page(self, response):
    for i, each in enumerate(response.json['products']):
        self.send_message(self.project_name, { 
        
                "name": each['name'],
                'price': each['prices'],
             }, url="%s#%s" % (response.url, i))

def on_message(self, project, msg):
    return msg

4.@catch_status_code_error

非200的响应将被视为fetch失败,不会传递给回调函数。使用此装饰器覆盖此特性。

def on_start(self):
    self.crawl('http://httpbin.org/status/404', self.callback)

@catch_status_code_error  
def callback(self, response):

标签: 电容160v223k聚酯薄膜400v223电容器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台