一、pyspider框架介绍
1.简介
pyspider 这是一个强大的原因python爬虫系统的实现。
- 纯python的
- 强大的webui,支持脚本编辑、任务监控、项目管理和结果检查
- 后台数据支持,MySQL,MongoDB,Reids,SQLite,Elasticsearch,PostgreSQL和SQLAlchemy
- 消息队列支持,RabbitMQ,Beanstalk,Redis以及Kombu
- 支持任务优先、定期、失败重试等调度方案
- 分布式架构,抓取js页面
- 支持Python2和3
2.安装
‘pip install pyspider’
ubuntu
如果使用ubuntu
,请先运行sudo apt update
再运行sudo apt upgrade
更新
apt-get install python python-dev python-distribute python-pip \ libcurl4-openssl-dev libxml2-dev libxslt1-dev python-lxml \ libssl-dev zlib1g-dev
删除wsgidav
然后重新安装2.4.1
版本
windows
1.下载 pyspider
方式一:pip install pyspider
方法二(建议):pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspider
2.运行 pyspider
操作报告如下错误:
Deprecated option 'domaincontroller': use 'http_authenticator.domain_controller' instead.
解决方案:删除wsgidav
然后重新安装2.4.1
版本
(1)where wsgidav
找到wsgidav删除位置wsgidav.exe
(2)pip install wsgidav==2.4.1
安装2.4.1版本
3.再次运行 pyspider
报错
cannot import name 'DispatcherMiddleware'
解决方案:
#卸载 pip uninstall werkzeug #安装指定版本 pip install werkzeug==0.16.0
4.再次运行 pyspider
运行成功
二、pyspider框架入门
1.启动pyspider
安装好pyspider
之后,创建项目文件夹存储相关文件,进入文件夹后运行pyspider
默认情况下,命令将运行web通过5000端口监控服务端,http://localhost:5000
即可访问pyspider的web管理界面,它看起来是这样的:
2.创建一个项目
点击右边的Create
按钮,在弹出框里,填写项目名称,和起始url
。
创建完成后,窗口右侧为代码编辑器,您可以在这里编写爬虫脚本。
3.第一个脚本
from pyspider.libs.base_handler import * class Handler(BaseHandler): crawl_config = {
} @every(minutes=24 * 60) def on_start(self): self.crawl('http://scrapy.org/', callback=self.index_page) @config(age=10 * 24 * 60 * 60) def index_page(self, response): for each in response.doc('a[href^="http"]').items(): self.crawl(each.attr.href, callback=self.detail_page) @config(priority=2) def detail_page(self, response): return {
"url": response.url, "title": response.doc('title').text(), }
def on_start(selef)
是脚本的入口。当你点击run
按钮时,它会被调用。self.crawl(url, callback=self.index_page)
是最重要的接口。它会添加一个新的待爬取任务。大部分的设置可以通过self.crawl
的参数去指定。def index_page(self, response)
接收一个response对象。response.doc
是一个pyquery
对象,它有一个类似jQuery
选择器一样的接口,去解析页面元素。def detail_page(self, response)
返回一个字典结果。这个结果默认会被写入resultdb
(结果数据库)。你可以通过复写on_result(self, result)
方法来按照你自己的需求处理结果。@every(minutes=24 * 60)
这个装饰器会告诉调度器,on_start
方法将会每天被调用。@config(age=10 * 24 * 60 * 60)
指定当self.crawl
爬取的页面类型为index_page
(当callback=self.index_page
)时的age参数的默认值。参数age
可以通过self.crawl(url, age=10*24*60*60)
和crawl_config
来指定,直接在方法参数中指定具有最高的优先级。age=10*24*60*60
告诉调度器抛弃10天内爬取过的请求。默认情况下,相同URL不会被爬取两次,甚至你修改了代码。对于初学者来说,第一次运行项目然后修改它,在第二次运行项目的情况非常常见,但是它不会再次爬行(阅读itag
了解解决方案)@config(priority=2)
标志着,detail page
将会被优先爬取。
你可以通过点击绿色的run
按钮,一步一步的调试你的脚本。切换到follows
面板,点击play
按钮前进。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EYzmCHn1-1614400262039)(images/run_one_step.png)]
4.运行项目
-
保存脚本
-
返回后台首页,找到你的项目
-
改变
status
为DEBUG
或RUNNING
-
点击按钮
run
三、架构
1.概述
下图显示了pyspider体系结构及其组件的概述,以及系统内部发生的数据流的概要。
组件之间通过消息队列进行连接。每一个组件都包含消息队列,都在它们自己的进程/线程中运行,并且是可以替换的。这意味者,当处理速度缓慢时,可以通过启动多个processor
实例来充分利用多核cpu,或者进行分布式部署。
2.组件
Scheduler
调度器从processor
返回的新任务队列中接收任务。判断是新任务还是需要重新爬取。通过优先级对任务进行分类,并且利用令牌桶算法
将任务发送给fetcher
. 处理周期任务,丢失的任务和失败的任务,并且稍后重试。
以上都可以通过self.crawl
进行设置。
注意,在当前的调度器实现中,只允许一个调度器。
Fetcher
Fetcher
的职责是获取web页面然后把结果发送给processor
。请求method, headers, cookies, proxy, etag
等,都可以设置。
Processor
处理器的职责是运行用户编写的脚本,去解析和提取信息。您的脚本在无限制的环境中运行。尽管我们有各种各样的工具(如风PyQuery)供您提取信息和连接,您可以使用任何您想使用的方法来处理响应。
处理器会捕捉异常和记录日志,发送状态(任务跟踪)和新的任务给调度器,发送结果给Result Worker
Result Worker
Result Worker
从Porcess
接收结果。Pyspider有一个内置的结果处理器将数据保存到resultdb
.根据您的需要重写它以处理结果。
WebUI
WebUI是一个面向所有内容的web前端。它包含:
- 脚本编辑器,调试器
- 项目管理器
- 任务监控程序
- 结果查看器和导出
也许webui是pyspider最吸引人的地方。使用这个强大的UI,您可以像pyspider一样一步一步地调试脚本。启动停止项目。找到哪个项目出错了,什么请求失败了,然后使用调试器再试一次。
Data flow
pyspider中的数据流如上图所示:
- 当您按下WebUI上的Run按钮时,每个脚本都有一个名为on_start的回调。作为项目的入口,
on_start
产生的新任务将会提交给调度器。 - 调度程序使用一个数据URI将这个on_start任务分派为要获取的普通任务。
- Fetcher对它发出一个请求和一个响应(对于数据URI,它是一个假的请求和响应,但与其他正常任务没有区别),然后送给处理器。
- 处理器调用on_start方法并生成一些要抓取的新URL。处理器向调度程序发送一条消息,说明此任务已完成,并通过消息队列将新任务发送给调度程序(在大多数情况下,这里没有on_start的结果。如果有结果,处理器将它们发送到result_queue)。
- 调度程序接收新任务,在数据库中查找,确定任务是新的还是需要重新抓取,如果是,将它们放入任务队列。按顺序分派任务。
- 这个过程重复(从步骤3开始),直到WWW死后才停止;-)。调度程序将检查定期任务,以抓取最新数据。
3.关于任务
基本原理
- 任务由它的
taskid
(默认:md5(url)
, 可以通过重写get_taskid(self, task)
方法来修改)来区分。 - 不同项目之间的任务是隔离的
- 一个任务有4个状态:
- active
- failed
- success
- bad-not used
- 只有处于
active
状态的任务会被调度 - 任务按优先次序执行。
调度
新任务
当一个新的任务(从未见过)出现:
- 如果设置了exetime但没有到达,它将被放入一个基于时间的队列中等待。
- 否则将被接受。
当任务已经在队列中:
- 除非(
force_update
)强制更新,否则忽略
当一个完成过的任务出现时:
- 如果
age
设置,且last_crawl_time + age < now
它将会被接受,否则抛弃。 - 如果
itag
设置,且它不等于上一次的值,它会被接受,否则抛弃。
重试
当请求错误或脚本错误发生时,任务将在默认情况下重试3次。
第一次重试将在30秒、1小时、6小时、12小时后每次执行,任何更多的重试将推迟24小时。retry_delay
是一个指定重试间隔的字典。这个字典中的元素是{retried:seconds}
, 如果未指定,则使用特殊键:''
空字符串指定的默认推迟时间。
例如默认的retry_delay
声明如下:
class MyHandler(BaseHandler):
retry_delay = {
0: 30,
1: 1*60*60,
2: 6*60*60,
3: 12*60*60,
'': 24*60*60
}
4.关于项目
在大多数情况下,项目是为一个网站编写的一个脚本。
- 项目是独立的,但是可以用
from Projects import other_project
将另一个项目作为模块导入 - 一个项目有5个状态:
TODO
,STOP
,CHECKING
,DEBUG
和RUNNING
TODO
- 刚创建,正在编写脚本- 如果您希望项目停止(= =),可以将其标记为STOP。
CHECKING
- 当正在运行的项目被修改时,为了防止未完成的修改,项目状态将被设置为自动检查。DEBUG
/RUNNING
- 这两种状态对爬虫没有区别。但最好在第一次运行时将其标记为DEBUG
,然后在检查后将其更改为RUNNING
。
- 爬行速率由
rate
和burst
并采用令牌桶算法进行控制。rate
- 一秒钟内有多少请求burst
- 考虑这种情况,RATE/BURST=0.1/3,这意味着蜘蛛每10秒抓取1个页面。所有任务都已完成,Project将每分钟检查最后更新的项目。假设找到3个新项目,pyspider将“爆发”并爬行3个任务,而不等待3*10秒。但是,第四个任务需要等待10秒。
- 若要删除项目,请将“组”设置为“删除”,将“状态”设置为“停止”,然后等待24小时。
回调on_finished
您可以在项目中重写on_finished方法,当task_queue变为0时将触发该方法。
第一种情况:当您启动一个项目来抓取一个包含100个页面的网站时,当100个页面被成功抓取或重试失败时,on_finished
回调将被触发。
第二种情况:带有auto_recrawl
任务的项目永远不会触发on_finished
回调,因为当其中有auto_recrawl
任务时,时间队列永远不会变为0。
第三种情况:带有@every
修饰方法的项目将在每次新提交的任务完成时触发on_finished
回调。
5. 脚本环境
变量
self.project_name
self.project
当前项目的详细信息self.response
self.task
官方文档:http://docs.pyspider.org/en/latest/
关于脚本
handler
的名称并不重要,但您需要至少一个继承basehandler
的类- 可以设置第三个参数来获取任务对象:
def callback(self、response、task)
- 默认情况下,非200响应不会提交回调。可以使用
@catch_status_code_error
来处理非200响应
关于环境
logging
,print
以及异常会被捕获。- 你可以通过
from projects import some_project
将其他项目当做模块导入。
Web view
- 以浏览器呈现的方式查看页面(大约)
HTML view
- 查看当前回调的HTML(索引页、细节页等)
Follows view
- 查看可从当前回调进行的回调
- 索引页面跟随视图将显示可执行的详细页面回调。
Messages view
- 显示
self.send_message
发送的消息
Enable CSS Selector Helper
- 启用Web视图的CSS选择器帮助程序。它获取您单击的元素的CSS选择器,然后将其添加到脚本中。
6.处理结果
从WebUI下载和查看数据很方便,但可能不适用于计算机。
使用resultdb
虽然resultdb仅用于结果预览,但不适用于大规模存储。
但是,如果您想从resultdb中获取数据,那么有一些使用数据库API的简单案例可以帮助您连接和选择数据。
from pyspider.database import connect_database
resultdb = connect_database("<your resutldb connection url>")
for project in resultdb.projects:
for result in resultdb.select(project):
assert result['taskid']
assert result['url']
assert result['result']
result['result']
是由你编写的脚本中的RETURN
语句提交的对象。
使用ResultWorker
在生产环境中,你可能希望将pyspider连接到你的系统的处理管道,而不是将结果存储到resultdb中。强烈建议重写ResultWorker。
from pyspider.result import ResultWorker
class MyResultWorker(ResultWorker):
def on_result(self, task, result):
assert task['taskid']
assert task['project']
assert task['url']
assert result
# your processing code goes here
result
是你脚本中return语句提交的对象。
您可以将这个脚本(例如,my_result_worker.py)放在启动pyspider的文件夹中。
为result_worker
子命令添加参数:
pyspider result_worker --result-cls=my_result_worker.MyResultWorker
或者,如果你使用配置文件
{
...
"result_worker": {
"result_cls": "my_result_worker.MyResultWorker"
}
...
}
为了兼容性,将存储在数据库中的结果编码为JSON。强烈建议您设计自己的数据库,并覆盖上面描述的ResultWorker。
关于结果的小技巧
想要在回调中返回多个结果?
resultdb会通过taskid(url)对结果进行去重,后面的结果会覆盖前面的。
一个解决方案是使用send_message
API为每个结果生成一个伪taskid。
def detail_page(self, response):
for li in response.doc('li').items():
self.send_message(self.project_name, {
...
}, url=response.url+"#"+li('a.product-sku').text())
def on_message(self, project, msg):
return msg
四、V2EX网站
创意工作者的社区。讨论编程、设计、硬件、游戏等令人激动的话题。 网址首页:https://www.v2ex.com/
爬取内容
主页:https://www.v2ex.com/go/python
代码实现
-
pyspider操作的创建运行调试操作
-
目标数据提取逻辑
-
mysql的存储数据
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Project: V2EX
from pyspider.libs.base_handler import *
import pymysql
import logging
logger=logging.getLogger(__name__)
class Handler(BaseHandler):
crawl_config = {
}
def __init__(self):
self.conn=pymysql.connect(host="localhost",user="root",password="123789",database="v2ex",charset="utf8")
self.cursor=self.conn.cursor()
self.page_num=1
@every(minutes=24 * 60)
def on_start(self):
print("执行on_start")
self.crawl('https://www.v2ex.com/go/python', callback=self.index_page, validate_cert=False)
@config(age=10 * 24 * 60 * 60)
def index_page(self, response):
print("执行index_page", response)
topic_list = response.etree.xpath('//a[@class="topic-link"]/@href')
print(topic_list)
for topic_link in topic_list:
# 完整路径的拼接
topic_link = "https://www.v2ex.com" + topic_link
print(topic_link)
self.crawl(topic_link, callback=self.detail_page, validate_cert=False)
#请求下一页
self.page_num+=1
next_page="https://www.v2ex.com/go/python?p={}".format(self.page_num)
self.crawl(next_page, callback=self.index_page, validate_cert=False)
@config(priority=2)
def detail_page(self, response):
print("执行detail_page", response)
# 1.topic
topic = response.etree.xpath("//h1/text()")[0]
print("topic", topic)
# 2.topic_content
topic_content = response.etree.xpath('string(//div[@class="topic_content"])')
print(topic_content)
topic_url=response.url
try:
sql="insert into topic(topic,topic_content,topic_url) values(%s,%s,%s)"
self.cursor.execute(sql,(topic,topic_content,topic_url))
self.conn.commit()
except Exception as e:
self.conn.rollback()
logger.warning("写入topic数据错误",e)
# 3.reply_content
reply_list = response.etree.xpath('//div[@class="reply_content"]')
print("reply_list", reply_list)
#查找topic_id
sql="select id from topic where topic_url=%s"
self.cursor.execute(sql, (topic_url))
topic_id=self.cursor.fetchone()
print("topic_id",topic_id)
if topic_id:
topic_id=topic_id[0]
for reply in reply_list:
reply_content = reply.xpath('string(.)')
print(reply_content)
try:
sql = "insert into reply_content(topic_id,reply_content,topic_url) values(%s,%s,%s)"
self.cursor.execute(sql, (topic_id, reply_content, topic_url))
self.conn.commit()
except Exception as e:
self.conn.rollback()
logger.warning("写入reply_content数据错误", e)
return {
"topic": topic,
"topic_content": topic_content,
}
topic数据表
reply_content数据表
五、API参考
1.self.crawl
官方文档地址:http://docs.pyspider.org/en/latest/apis/self.crawl/
url:统一资源定位符 callback:回调函数 method:请求方法 params:请求参数 data;form表单数据 age:过期时间 priority:权限,优先级 exetime:执行时间 retries:重试次数 itag:页面标识 auto_recrawl:自动重爬
user_agent:用户代理 headers:请求头 cookies:cookies connect_timeout:连接超时 timeout:响应超时 allow_redirects:允许重定向 validate_cert:证书验证 proxy:代理 fetch_type:抓取方式 save:参数传递
self.crawl(url, **kwargs)
self.crawl
是告诉pyspider应该爬取哪个url的主要接口程序。
参数
url
将被爬取的url或url列表
callback
解析返回响应的方法
def on_start(self):
self.crawl('http://scrapy.org/', callback=self.index_page)
接下来的参数可选
age
任务的有效期。在此期间,页面将被视为未修改。默认:-1(不会再次爬取)
@config(age=10 * 24 * 60 * 60)
def index_page(self, response):
...
每个被index_page
解析的页面将被视为10天内不会发生改变。如果您在10天内提交的任务,它将被丢弃。
priority
待调度任务的优先级越高优先级越大。默认:0
def index_page(self):
self.crawl('http://www.example.org/page2.html', callback=self.index_page)
self.crawl('http://www.example.org/233.html', callback=self.detail_page,
priority=1)
页面233.html
将会比page2.html
优先爬取。使用此参数执行广度优先算法从而减少队列中任务的数量,控制内存资源。
exetime
任务的执行时间(时间戳)。默认:0(立即运行)
import time
def on_start(self):
self.crawl('http://www.example.org/', callback=self.callback,
exetime=time.time()+30*60)
页面将在30分钟后执行。
retries
失败时的重试次数。默认:3
itag
来自前沿页面的标记,用于显示任务的潜在修改。它将与其最后一个值进行比较,更改后重新爬取。默认:None
def index_page(self, response):
for item in response.doc('.item').items():
self.crawl(item.find('a').attr.url, callback=self.detail_page,
itag=item.find('.update-time').text())
在上面的例子中,.update-time
作为itag。如果没有更改,请求将被丢弃。
或者如果希望重新启动所有的任务,你可以通过Handler.crawl_config
来设置itag
,指定脚本的版本。
class Handler(BaseHandler):
crawl_config = {
'itag': 'v223'
}
修改脚本后更改itag的值,然后再次单击run按钮。如果之前没有设置,也没有关系。
auto_recrawl
当启用时,任务会在每个生命周期重新爬取。默认:False
def on_start(self):
self.crawl('http://www.example.org/', callback=self.callback,
age=5*60*60, auto_recrawl=True)
每5个小时页面会被重新爬取。
method
使用的http请求方法。默认:GET
params
要附加到URL的URL参数字典。
def on_start(self):
self.crawl('http://httpbin.org/get', callback=self.callback,
params={
'a': 123, 'b': 'c'})
self.crawl('http://httpbin.org/get?a=123&b=c', callback=self.callback)
这两个请求相同。
data
附加到请求的请求体。如果提供字典,则将进行表单编码。
def on_start(self):
self.crawl('http://httpbin.org/post', callback=self.callback,
method='POST', data={
'a': 123, 'b': 'c'})
files
要上传的文件,格式为字典{field: {filename: 'content'}}
user_agent
请求的用户代理
headers
要发送的请求头字典。
cookies
要附加到请求的cookie字典
timeout
获取页面的最长时间(秒)。默认:120
allow_redirects
遵循30x
重定向,默认:True
taskid
唯一标识任务的id,默认是MD5检查代码的URL,可以被def get_taskid(self, task)
方法覆盖
import json
from pyspider.libs.utils import md5string
def get_taskid(self, task):
return md5string(task['url']+json.dumps(task['fetch'].get('data', '')))
默认情况下,只有url的md5值作为taskid,上面的代码将post请求的数据添加为taskid的一部分。
save
一个传递给回调方法的对象,可以通过response.save访问
def on_start(self):
self.crawl('http://www.example.org/', callback=self.callback,
save={
'a': 123})
def callback(self, response):
return response.save['a']
123
将在callback里返回。
2.Response
响应对象的属性。
Response.url
最终的URL
Response.text
unicode编码的响应文本。
如果Response.encoding
为None,charset
模块可用,内容编码将会被猜测。
Response.content
二进制格式的响应内容。
Response.doc
响应内容的PyQuery对象。
参考PyQuery的文档:https://pythonhosted.org/pyquery/
Response.etree
响应内容的lxml对象。
Response.json
响应的json编码内容(如果有的话)。
3.self.send_message
self.send_message(project, msg, [url])
发送消息给其他项目。可以被def on_message(self, project, message)
接收。
- project - 项目名
- msg - 任何可以json序列化的对象
- url - 如果有想同的任务结果将被覆盖。在默认情况下
send_message
会共享当前taskid
. 更改它可以实现一个响应返回多个结果。
def detail_page(self, response):
for i, each in enumerate(response.json['products']):
self.send_message(self.project_name, {
"name": each['name'],
'price': each['prices'],
}, url="%s#%s" % (response.url, i))
def on_message(self, project, msg):
return msg
4.@catch_status_code_error
非200的响应将被视为fetch失败,不会传递给回调函数。使用此装饰器覆盖此特性。
def on_start(self):
self.crawl('http://httpbin.org/status/404', self.callback)
@catch_status_code_error
def callback(self, response):