它在网站上捕获是开发者的典型用例。无论是副例子,项目还是你正在形成一家数据公司,似乎都有必要捕获数据。
例如,如果你想创建一个价格比较网站,你将从各种商业网站获取价格信息;或者你需要创建一个自动识别的电子商务,并在亚马逊上找到价格 。还有很多类似的场景。
你有没有获得所有页面信息?
您可以在短时间内启动您的项目,以便在此服务器上使用,因为Python 需要。
为了使代码正常运行,您需要安装它[1]。部分系统可能已经预装了它。然后您还需要使用pip install
安装所有必要的库。
pipinstallrequestsbeautifulsoup4aiohttpnumpy
如果了解操作的基本知识,可以跳过理论部分直接进入实际情况。
它于描述能力的术语的。
当您选择按顺序向网站发出请求时,您可以一次发出请求,等待结果返回,然后发出下一个请求。
但是,无论是否,您也同时发送多个请求,并在它们返回时处理的结果,这种方式的提升请求的效果是非常显着的。与请求相比,不同的速度可以顺序并行运行(多个 CPU) ,比现在很快 -- 稍后会详细介绍。
了解顺序的能力。我们需要按顺序处理任务之间的区别。假设有五个任务,每个任务都需要 10 完成。在处理它们时,完成五项任务的时间是 50 秒;同时处理时间只需要 10 秒完成。
我们还可以通过在多个过程中执行任务,以更短的速度完成更多的任务。
这里有几种不同的并行化方式:不同multiprocessing
和asyncio
。asyncio
,这是一个包含 Python 该模块为使用协程提供了功能丰富的基础设施。
同时,它意味着更复杂的系统和代码,所以请考虑在使用场景中是否利大于弊。
-
在更短的时间内完成更多的工作
-
你可以把最多的中文时间花在其他请求上
-
更难开发和调试
-
可能的条件
-
安全任务需要检查和使用
-
意外希望增加程序的阻止
-
因此,需要设置相似的功能
-
如果有太多的小型请求,可能会变成 DDoS 攻击
*同时,在释放所有请求时要小心*
在理解选择之前,我们有必要的区别 IO 密集型与 CPU 密集型之间的区别asyncio
。multiprocessing
[2]是使用 async/await 两个处理器上的语法操作代码库。
IO 密集型将受到 I/O 影响而缓慢运行。在我们的程序中,主要指网络请求。
例如 CPU 密集型意味着程序会由于 CPU 压力导致运行缓慢——数学计算。
CPU 大量的 CPU 类型会在多个进程中提升效率。 /O 密集型的情况,情况可能不是这样。
asyncio
但如果您的数据采集主要受 I/O 限制,因此我们选择了multiprocessing
。
我们scrapeme.live
作为示例开始,这是一个用于测试的电子商务网站。
首先,我们开始追踪它们的版本。因此,它们将保持不变。
通过访问目标主页,我们发现它有 48 个子页面。是测试环境,这些页面不会很快发生变化,我们会使用到以下子特征:
base_url = "https://scrapeme.live/shop/page" pages = range(1, 49) # max page (48) + 1
现在,从提取基础数据。使用来自内容的requests.get
内容,然后BeautifulSoup
解析它),但为起见,我们不会在这里详细介绍。
import requests from bs4 import BeautifulSoup def extract_details(page): # concatenate page number to base URL response = requests.get(f"{base_url}/{page}/") soup = BeautifulSoup(response.text, "html.parser") pokemon_list = [] for pokemon in soup.select(".product"): # loop each product pokemon_list.append({ "id": pokemon.find(class_="add_to_cart_button").get("data-product_id"), "name": pokemon.find("h2").text.strip(), "price": pokemon.find(class_="price").text.strip(), "url": pokemon.find(class_="woocommerce-loop-product__link").get("href"), }) return pokemon_list
extract_details
函数将获取一个页码并将其连接起来,创建URL。获取并创建产品后用于返回的结果列表。这是一个用于显示结果的内容列表,这是一个使用结果的必需细节。
我们需要为每个页面运行的函数,获取所有结果,并存储它们。
import csv # modified to avoid running all the pages unintentionally pages = range(1, 3) def store_results(list_of_lists): pokemon_list = sum(list_of_lists, []) # flatten lists with open("pokemon.csv", "w") as pokemon_file: # get dictionary keys for the CSV header fieldnames = pokemon_list[0].keys() file_writer = csv.DictWriter(pokemon_file, fieldnames=fieldnames) file_writer.writeheader() file_writer.writerows(pokemon_list) list_of_lists = [ extract_details(page) for page in pages ] store_results(list_of_lists)
运行上面的代码将获得获取的代码,抽取产品的两个顺序CSV 32个),你把它们存储在一个名为pokemon.csv
的文件中。store_results
函数不影响或可以并行的抓取。
列表是为了提醒大家writerows
将其命名为自己的结果,这list_of_lists
是为了提醒大家,即使它是我们必须的结果。
输出 CSV 文件的例子:
ID | 姓名 | 价格 | 网址 |
---|---|---|---|
759 | 大头龙 | 63.00 英镑 | https://scrapeme.live/shop/Bulbasaur/ |
729 | 常春藤龙 | 87.00 英镑 | https://scrapeme.live/shop/Ivysaur/ |
730 | 维纳斯 | 105.00 英镑 | https://scrapeme.live/shop/Venusaur/ |
731 | 小火龙 | 48.00 英镑 | https://scrapeme.live/shop/Charmander/ |
732 | 变色龙 | 165.00 英镑 | https://scrapeme.live/shop/Charmeleon/ |
如果您要为每一个脚本(48)运行,则通过生成一个包含 755 个产品的 CSV 文件,并花费大约 30 秒。
time python script.py real 0m31,806s user 0m1,936s sys 0m0,073s
我们应该知道,我们可以的请求。
其他应该运行执行,它也涉及一些得名。但是这不是线性的改进。
为此,我们将使用上面提到的asyncio
。它允许我们在事件循环中的同一个线程上运行多个任务(就像 Javascript 一样)。运行到一个函数,并在允许的时候将发生切换在我们的例子中,HTTP 请求切换是允许的。
我们将开始看到一个示例。请注意,并且脚本应该需要我们一次运行。我们不能直接调用main
。需要asyncio
知道它是一个需要执行的异步函数。
import asyncio async def main(): print("Hello ...") await asyncio.sleep(1) print("... World!") asyncio.run(main())
time python script.py Hello ... ... World! real 0m1,054s user 0m0,045s sys 0m0,008s
示例,示例案例来运行打印使用一个函数秒。每个单元将一秒扩展一秒,然后一个文本。如果它们运行大约需要。asyncio
,只需要
如前所述,纯I/O型任务,要求执行得当——睡眠不是,但它的例子很重要。
我们需要创建一个辅助函数,它会列出main
一个列表,然后我们先打印一条。然后以调用该函数次,并将每个消息存储在一个任务中的关键部分是执行并等待所有任务完成。这就是[4]个任务的事情。
import asyncio async def demo_function(i): await asyncio.sleep(1) print(f"Hello {i}") async def main(): tasks = [ demo_function(i) for i in range(0, 100) ] await asyncio.gather(*tasks) asyncio.run(main())
就像那般的一百条消息和一个!执行时间。完美的
我们需要将数据通知结果同时下数据采集情况跟踪。并在所有请求完成后存储它们并返回请求列表。的数据遗嘱。
我们的第一次不会有可能同时遇到问题,所以在使用每个 URL 的时候可能会运行它的请求情况......好吧,它会同时执行所有这些请求。给服务器带来巨大的负载,并可能会损害您的计算机。
requests
不支持即开即用的,因此我们将使用http使用没有化解。requests
aiohttp
import asyncio import aiohttp from bs4 import BeautifulSoup async def extract_details(page, session): # similar to requests.get but with a different syntax async with session.get(f"{base_url}/{page}/") as response: # notice that we must await the .text() function soup = BeautifulSoup(await response.text(), "html.parser") # [...] same as before return pokemon_list async def main(): # create an aiohttp session and pass it to each function execution async with aiohttp.ClientSession() as session: tasks = [ extract_details(page, session) for page in pages ] list_of_lists = await asyncio.gather(*tasks) store_results(list_of_lists) asyncio.run(main())
CSV 文件应该像以前一样包含所有产品的信息(755 个)。由于我们同时执行所有页面调用,结果不会顺序到达。如果我们将结果添加到extract_details
里面的文件中,可能是无序的。但我们会等待所有任务完成然后处理它们,因此顺序性不会有太大影响。
time python script.py real 0m11,442s user 0m1,332s sys 0m0,060s
我们做到了!提升了3倍,但是……不应该是40倍吗?没那么简单。很多因素都会影响(网络、CPU、RAM等)。
在这个页面中,我们注意到来自时间的响应时,响应时,这然然。可能在服务器的设计/控制下,可能会发生相同的多个请求,不同的显示效果。它不是一种拒绝,而是一个片子。
要查看的加速,您可以针对[6]页面进行测试。这是另一个测试页面,真正需要等待 2 秒然后返回。
base_url = "https://httpbin.org/delay/2" #... async def extract_details(page, session): async with session.get(base_url) as response: #...
这里删除了所有的提取和存储逻辑,只调用了延迟内的 URL 48 次,并在 3 秒内运行完毕。
time python script.py real 0m2,865s user 0m0,245s sys 0m0,031s
如上所述,我们应该提出更多的请求,尤其是域名。
asyncio [7]创建,一个将获得和释放的对象。它的功能将阻止一些调用,以获取锁,从而实现内部最大的功能。
我们需要自己创建自己的信号量。然后等待抽取函数运行,直到async with sem
可用。
max_concurrency = 3 sem = asyncio.Semaphore(max_concurrency) async def extract_details(page, session): async with sem: # semaphore limits num of simultaneous downloads async with session.get(f"{base_url}/{page}/") as response: # ... async def main(): # ... loop = asyncio.get_event_loop() loop.run_until_complete(main())
它完成了工作,相对容易实现!这是最大的设置为 3 的输出。
time python script.py real 0m13,062s user 0m1,455s sys 0m0,047s
这表明如果运行无限期的版本并并没有限制。我们将在整个时间与时间未限定10,总的运行时间相近。
aiohttp
备选方案,可进一步提供解决方案。我们可以提供一种自定义的[8]的解决方案客户端。
我们可以使用适合我们需要的参数来制造它:
-
limit
——《同时连接的总数》。 -
limit_per_host
- “限制同时连接到同一终点的连接数”(今天、港口和is_ssl
)。
max_concurrency = 10 max_concurrency_per_host = 3 async def main(): connector = aiohttp.TCPConnector(limit=max_concurrency, limit_per_host=max_concurrency_per_host) async with aiohttp.ClientSession(connector=connector) as session: # ... asyncio.run(main())
写法也容易实施和维护!这是最大的设置为 3 的输出。
time python script.py real 0m16,188s user 0m1,311s sys 0m0,065s
不同Semaphore
的优势可以选择不同的站点使用和使用不同的区域,我们可以选择不同的区域来使用不同的使用方式,来获取不同的使用方式,带来自己的不同之处。
看起来有点像。需要针对真实情况,使用更多页面和实际数据运行一些慢速测试。
就像我们之前看到的那种,数据是抓取我/O型的。但是,如果我们需要将它与一些CPU紧密计算混合办?为了测试这种情况,我们将使用一个函数函数将在每个人身上count_a_lot
夺取 CPU 强制执行的任务。
def count_a_lot(): count_to = 100_000_000 counter = 0 while counter < count_to: counter = counter + 1 async def extract_details(page, session): async with session.get(f"{base_url}/{page}/") as response: # ... count_a_lot() return pokemon_list
对于 asyncio 版本,只需像以前一样运行它。可能需要很长时间⏳。
time python script.py real 2m37,827s user 2m35,586s sys 0m0,244s
现在,比较难理解的部分来了:
直接引入multiprocessing
看起来有点困难。实际上,我们需要创建一个ProcessPoolExecutor
,它能够“使用一个进程来外部控制执行调用”。用于创建。
但它不会分配负载率。我们将根据 CPU 的数量将页面范围划分成的组件NumPy
。array_split
main
函数的其他部分类似asyncio
版本,但改变了一些语法以匹配multiprocessing
的风格。
这里的区别是我们不会直接调用extract_details
。实际上是可以的,但我们将尝试通过将multiprocessing
与asyncio
混合使用来获得最好的执行效率。
from concurrent.futures import ProcessPoolExecutor from multiprocessing import cpu_count import numpy as np num_cores = cpu_count() # number of CPU cores def main(): executor = ProcessPoolExecutor(max_workers=num_cores) tasks = [ executor.submit(asyncio_wrapper, pages_for_task) for pages_for_task in np.array_split(pages, num_cores) ] doneTasks, _ = concurrent.futures.wait(tasks) results = [ item.result() for item in doneTasks ] store_results(results) main()
长话短说,每个CPU进程都需要抓取。一共有48个,假设你的一个CPU有8个,每个页面进程将请求(6个6 * 8 = 48)。
这六个页面将同时运行!之后计算将需要占用,是 CPU 密集型的。但是我们有很多 CPU,所以它们应该比纯 asyncio 版本运行得来。
async def extract_details_task(pages_for_task): async with aiohttp.ClientSession() as session: tasks = [ extract_details(page, session) for page in pages_for_task ] list_of_lists = await asyncio.gather(*tasks) return sum(list_of_lists, []) def asyncio_wrapper(pages_for_task): return asyncio.run(extract_details_task(pages_for_task))
每个 CPU 进程将使用页面的子集启动一个 asyncio(例如,第一个页面从 1 到 6)。
然后,每个都将调用几个 URL,使用已知的extract_details
函数。
上面的内容需要花点时间来吸收它。整个过程是这样的:
-
创建执行器
-
剖视图
-
asyncio进程启动进程
-
创建一个
aiohttp
会话并创建页面子集的任务 -
抽取每一页的数据
-
合并并存储结果
下面是本次的脚本。虽然之前我们没有提到它,但这里的user
时间却很显眼。对于只是运行asyncio的脚本:
time python script.py real 2m37,827s user 2m35,586s sys 0m0,244s
具有asyncio
和多个进程的版本:
time python script.py real 0m38,048s user 3m3,147s sys 0m0,532s
发现区别了吗?实际运行方面的第一个两分钟,第二个用了 40 秒 CPU 时间user
,第二个时间超过了三分钟!时确实有点多。
预计同时处理“但浪费”了更多时间,是提前完成的显然,您在选择开发方法时,需要考虑调试的复杂度。
我们已经asyncio
看到用于抓取,因为在运行时间都用于网络请求,这种情况属于 I/O 类型密集型适用于单核中的正常处理并且。
如果收集的数据有一些例子是密集型工作,但你可能会改变这种情况。但如果你理解统计数据有一点口语,至少需要出现这种情况。
在这种情况下,你可以比同类更容易完成这项工作。同时我们添加自定义设备,以每个部分aiohttp
的请求连接数、相同的请求目标总数。创建一个可以扩展的数据采集程序了。asyncio
requests
重要的部分是允许的 URL/任务加入程序运行(类似任务),但这是另一篇文章的内容。关注另一个!