Steel Knowledge Base, a knowledge base for learning python crawler and data analysis. Life is too short to use python.
When we used the requests library to crawl a site before, every time a request was made, the program had to wait for the website to return a response before it could continue to run. During the entire crawling process, the entire crawler program has been waiting and actually did nothing. .
For tasks that occupy disk/memory IO and network IO, most of the time is an operation that the CPU is waiting for, which is called an IO-intensive task. Is there any optimization solution for this situation, of course, that is to use the aiohttp library to implement asynchronous crawler.
what is aiohttp
When we use requests, we can only wait for a request to go out and come back before sending the next request. Obviously, the efficiency is not high. At this time, if you switch to the asynchronous request method, there will be no such waiting. When a request is sent, no matter when the request is responded to, the program suspends the coroutine object through await and directly proceeds to the next request.
The solution is to use aiohttp + asyncio, what is aiohttp? An asynchronous HTTP network module based on asyncio, which can be used to implement asynchronous crawlers, which is significantly faster than requests' synchronous crawlers.
The difference between requests and aiohttp
The difference is that one is synchronous and the other is asynchronous. Not much to say, go directly to the code to see the effect.
install aiohttp
pip install aiohttp
-
Request synchronization example:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# author: 钢铁知识库
import time
import requests
# 同步请求
def main():
start = time.time()
for i in range(5):
res = requests.get('http://httpbin.org/delay/2')
print(f'当前时间:{datetime.datetime.now()}, status_code = {res.status_code}')
print(f'requests同步耗时:{time.time() - start}')
if __name__ == '__main__':
main()
'''
当前时间:2022-09-05 15:44:51.991685, status_code = 200
当前时间:2022-09-05 15:44:54.528918, status_code = 200
当前时间:2022-09-05 15:44:57.057373, status_code = 200
当前时间:2022-09-05 15:44:59.643119, status_code = 200
当前时间:2022-09-05 15:45:02.167362, status_code = 200
requests同步耗时:12.785893440246582
'''
You can see that 5 requests take a total of 12.7 seconds, and then look at how long the same request is asynchronous.
-
aiohttp asynchronous example:
#!/usr/bin/env python
# file: day6-9同步和异步.py
# author: 钢铁知识库
import asyncio
import time
import aiohttp
async def async_http():
# 声明一个支持异步的上下文管理器
async with aiohttp.ClientSession() as session:
res = await session.get('http://httpbin.org/delay/2')
print(f'当前时间:{datetime.datetime.now()}, status_code = {res.status}')
tasks = [async_http() for _ in range(5)]
start = time.time()
# Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run()来代替最后的启动操作
asyncio.run(asyncio.wait(tasks))
print(f'aiohttp异步耗时:{time.time() - start}')
'''
当前时间:2022-09-05 15:42:32.363966, status_code = 200
当前时间:2022-09-05 15:42:32.366957, status_code = 200
当前时间:2022-09-05 15:42:32.374973, status_code = 200
当前时间:2022-09-05 15:42:32.384909, status_code = 200
当前时间:2022-09-05 15:42:32.390318, status_code = 200
aiohttp异步耗时:2.5826876163482666
'''
Two comparisons can see the execution process, one is executed sequentially and the other is executed simultaneously. This is the difference between synchronous and asynchronous.
Introduction to the use of aiohttp
Next, we will introduce the usage and crawling of the aiohttp library in detail. aiohttp is a library that supports asynchronous requests. It works with asyncio to make it very convenient for us to implement asynchronous request operations. The asyncio module internally implements asynchronous operations on TCP, UDP, and SSL protocols, but for HTTP requests, aiohttp implementation is required.
aiohttp is divided into two parts, one is Client and the other is Server. Let's talk about the usage of the aiohttp client part.
Basic example
Write a simple case first
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : 钢铁知识库
import asyncio
import aiohttp
async def get_api(session, url):
# 声明一个支持异步的上下文管理器
async with session.get(url) as response:
return await response.text(), response.status
async def main():
async with aiohttp.ClientSession() as session:
html, status = await get_api(session, 'http://httpbin.org/delay/2')
print(f'html: {html[:50]}')
print(f'status : {status}')
if __name__ == '__main__':
# Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run(main())来代替最后的启动操作
asyncio.get_event_loop().run_until_complete(main())
'''
html: {
"args": {},
"data": "",
"files": {},
status : 200
Process finished with exit code 0
'''
The method of aiohttp request is obviously different from the previous one, mainly including the following points:
-
In addition to importing the aiohttp library, the asyncio library must also be introduced, because to achieve asynchrony, you need to start the coroutine. -
Asynchronous method definitions are different, and async should be added in front to decorate. -
with as is used to declare a context manager to help us allocate and release resources automatically, plus async code supports asynchrony. -
For operations that return coroutine objects, you need to add await to decorate them. response.text() returns the coroutine object. -
Last run enable loop event
Note: In Python 3.7 and later, asyncio.run(main()) can be used instead of the final startup operation.
URL parameter settings
For the setting of URL parameters, we can use the params setting to pass in a dictionary. The example is as follows:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : 钢铁知识库
import aiohttp
import asyncio
async def main():
params = {'name': '钢铁知识库', 'age': 23}
async with aiohttp.ClientSession() as session:
async with session.get('https://www.httpbin.org/get', params=params) as res:
print(await res.json())
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
'''
{'args': {'age': '23', 'name': '钢铁知识库'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-63162e34-1acf7bde7a6d801368494c72'}, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/get?name=钢铁知识库&age=23'}
'''
You can see that the actual requested URL is followed by a suffix, which is the content of params.
request type
In addition to get requests, aiohttp also supports other request types, such as POST, PUT, DELETE, etc., similar to the way requests are used.
session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')
To use these methods, just replace the corresponding method and parameters. The usage is similar to get, so no example will be given.
Several ways to respond
For the response, we can use the following methods to obtain the response situation respectively. Status code, response header, response body, response body binary content, response body JSON result, examples are as follows:
#!/usr/bin/env python
# @Author : 钢铁知识库
import aiohttp
import asyncio
async def main():
data = {'name': '钢铁知识库', 'age': 23}
async with aiohttp.ClientSession() as session:
async with session.post('https://www.httpbin.org/post', data=data) as response:
print('status:', response.status) # 状态码
print('headers:', response.headers) # 响应头
print('body:', await response.text()) # 响应体
print('bytes:', await response.read()) # 响应体二进制内容
print('json:', await response.json()) # 响应体json数据
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
'''
status: 200
headers: <CIMultiDictProxy('Date': 'Tue, 06 Sep 2022 00:18:36 GMT', 'Content-Type': 'application/json', 'Content-Length': '534', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
body: {
"args": {},
"data": "",
"files": {},
"form": {
"age": "23",
"name": "\u94a2\u94c1\u77e5\u8bc6\u5e93"
},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "57",
"Content-Type": "application/x-www-form-urlencoded",
"Host": "www.httpbin.org",
"User-Agent": "Python/3.8 aiohttp/3.8.1",
"X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"
},
"json": null,
"origin": "122.55.11.188",
"url": "https://www.httpbin.org/post"
}
bytes: b'{\n "args": {}, \n "data": "", \n "files": {}, \n "form": {\n "age": "23", \n "name": "\\u94a2\\u94c1\\u77e5\\u8bc6\\u5e93"\n }, \n "headers": {\n "Accept": "*/*", \n "Accept-Encoding": "gzip, deflate", \n "Content-Length": "57", \n "Content-Type": "application/x-www-form-urlencoded", \n "Host": "www.httpbin.org", \n "User-Agent": "Python/3.8 aiohttp/3.8.1", \n "X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"\n }, \n "json": null, \n "origin": "122.5.132.196", \n "url": "https://www.httpbin.org/post"\n}\n'
json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '23', 'name': '钢铁知识库'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '57', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-631691dc-6aa1b2b85045a1a0481d06e1'}, 'json': None, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/post'}
'''
You can see that some fields need to be preceded by await, because it returns a coroutine object (such as async-modified method), then await must be added in front.
timeout setting
We can set the timeout with the help of the ClientTimeout
object. For example, to set a timeout of 1 second, we can achieve this:
#!/usr/bin/env python
# @Author : 钢铁知识库
import aiohttp
import asyncio
async def main():
# 设置 1 秒的超时
timeout = aiohttp.ClientTimeout(total=1)
data = {'name': '钢铁知识库', 'age': 23}
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get('https://www.httpbin.org/delay/2', data=data) as response:
print('status:', response.status) # 状态码
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
'''
Traceback (most recent call last):
####中间省略####
raise asyncio.TimeoutError from None
asyncio.exceptions.TimeoutError
'''
Here, a timeout of 1 second is set and the request is delayed for 2 seconds. It is found that an exception is thrown asyncio.TimeoutError
, and if it is normal, it responds with 200.
Concurrency limit
aiohttp can support very high concurrency, but it may be unbearable in the face of high concurrency websites, and there is a danger of hanging at any time. At this time, some control of concurrency is required. Now we use asyncio's Semaphore to control the amount of concurrency. The examples are as follows:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : 钢铁知识库
import asyncio
from datetime import datetime
import aiohttp
# 声明最大并发量
semaphore = asyncio.Semaphore(2)
async def get_api():
async with semaphore:
print(f'scrapting...{datetime.now()}')
async with session.get('https://www.baidu.com') as response:
await asyncio.sleep(2)
# print(f'当前时间:{datetime.now()}, {response.status}')
async def main():
global session
session = aiohttp.ClientSession()
tasks = [asyncio.ensure_future(get_api()) for _ in range(1000)]
await asyncio.gather(*tasks)
await session.close()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
'''
scrapting...2022-09-07 08:11:14.190000
scrapting...2022-09-07 08:11:14.292000
scrapting...2022-09-07 08:11:16.482000
scrapting...2022-09-07 08:11:16.504000
scrapting...2022-09-07 08:11:18.520000
scrapting...2022-09-07 08:11:18.521000
'''
In the main method, we declare 1000 tasks. If there is no concurrency limit through Semaphore, these 1000 tasks will be executed at the same time after they are placed in the gather method, and the amount of concurrency is quite large. With the control of the semaphore, the number of tasks running at the same time will be controlled, which will limit the speed of aiohttp.
aiohttp asynchronous crawling combat
Next, we will practice a novel crawler asynchronously. The requirements are as follows:
Demand page: https://dushu.baidu.com/pc/detail?gid=4308080950
Catalog interface: https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"4308080950"}
Details interface:https://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"4295122774","cid":"4295122774|116332"}
Key parameters: book_id
: Novel ID, cid
: Chapter id
Collection requirements: use the coroutine method to write, and store the data in mongo
Demand analysis: Click on the demand page, and you can find two interfaces through F12 packet capture. A directory interface and a detail interface.
The first step is to request the catalog interface to get the cid chapter id, then pass the cid to the details interface to get the novel data, and finally store it in mongo.
Not much to say, go directly to the code:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : 钢铁知识库
# 不合适就是不合适,真正合适的,你不会有半点犹豫。
import asyncio
import json,re
import logging
import aiohttp
import requests
from utils.conn_db import ConnDb
# 日志格式
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
# 章节目录api
b_id = '4308080950'
url = 'https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"'+b_id+'"}'
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/104.0.0.0 Safari/537.36"
}
# 并发声明
semaphore = asyncio.Semaphore(5)
async def download(title,b_id, cid):
data = {
"book_id": b_id,
"cid": f'{b_id}|{cid}',
}
data = json.dumps(data)
detail_url = 'https://dushu.baidu.com/api/pc/getChapterContent?data={}'.format(data)
async with semaphore:
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get(detail_url) as response:
res = await response.json()
content = {
'title': title,
'content': res['data']['novel']['content']
}
# print(title)
await save_data(content)
async def save_data(data):
if data:
client = ConnDb().conn_motor_mongo()
db = client.baidu_novel
collection = db.novel
logging.info('saving data %s', data)
await collection.update_one(
{'title': data.get('title')},
{'$set': data},
upsert=True
)
async def main():
res = requests.get(url, headers=headers)
tasks = []
for re in res.json()['data']['novel']['items']: # 拿到某小说目录cid
title = re['title']
cid = re['cid']
tasks.append(download(title, b_id, cid)) # 将请求放到列表里,再通过gather执行并发
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
So far, we have used aiohttp to complete the crawling of novel chapters.
To achieve asynchronous processing, there must be a suspend operation first. When a task needs to wait for the IO result, the current task can be suspended and other tasks can be executed instead, so as to make full use of resources. To achieve asynchronous, you need to understand await The usage of await can suspend time-consuming waiting operations and give up control. When an await is encountered during the execution of the coroutine, the time loop will suspend this coroutine and turn to execute other coroutines until other coroutines are suspended or completed.
The object following await must be in one of the following formats:
-
A native coroutine object returned from a native coroutine function, a native coroutine object. -
A generator-based coroutine object returned from a function decorated with types.coroutine, a generator decorated with types.coroutine, this generator can return a coroutine object. -
An object with an "await" method returning an iterator, an iterator returned by an object containing an "await" method.
---- 20220909 Steel Knowledge Base
Summarize
The above is the content of the asynchronous crawler completed with the help of the two main modules of coroutine async and asynchronous aiohttp
. The time-consuming of aiohttp to crawl a website in an asynchronous way is much less than that of requests synchronously. The examples listed above hope to be helpful to you.
Note that threads and coroutines are two concepts. We will talk about the relationship between processes and threads, threads and coroutines later when we find opportunities.