爬虫进阶:框架功能完善-金年会app官方网
利用线程池实现异步
异步任务分析:
在引擎中,实现的主要功能如下
- 上面的方框中是关于start_urls中的请求处理
- 下面的方框中是一个请求从调度器取出请求,进行下载之后交给爬虫解析再交给管道的过程 在以上两个过程中,他们之间没有直接的联系,都可以通过异步多线程的方式分别实现,加快程序执行的速度
那么具体该如何实现该逻辑
- multiprocessing.dummy 提供的pool 类具有apply_async的方法,能够异步的执行让他运行的函数
- apply_async方法能够接收一个callback,即其中的函数执行完成之后继续会做的事情,在这里,我们可以定义一个callback,其中让他继续执行上图中下方框的任务,同时给他一个停止条件,
利用回调实现循环
利用回调实现递归,可以达到循环的目的
# scrapy_plus/core/engine.py
import time
from multiprocessing.dummy import pool # 导入线程池对象
import importlib
from datetime import datetime
from scrapy_plus.http.request import request # 导入request对象
from scrapy_plus.utils.log import logger # 导入logger
from scrapy_plus.conf import settings
from .scheduler import scheduler
from .downloader import downloader
class engine(object):
'''
a. 对外提供整个的程序的入口
b. 依次调用其他组件对外提供的接口,实现整个框架的运作(驱动)
'''
def __init__(self):
self.scheduler = scheduler() # 初始化调度器对象
self.downloader = downloader() # 初始化下载器对象
self.spiders = self._auto_import_instances(settings.spiders, true) # 动态导入并实例化爬虫对象
self.pipelines = self._auto_import_instances(settings.pipelines) # 动态导入并实例化管道对象
self.spider_mids = self._auto_import_instances(settings.spider_middlewares) # 动态导入并实例化爬虫中间件对象
self.downloader_mids = self._auto_import_instances(settings.downloader_middlewares) # 动态导入并实例化下载器中间件对象
self.total_response_number = 0
self.pool = pool() # 创建线程池对象
self.running = false # 记录是否退出程序的状态
def start(self):
'''启动整个引擎'''
start = datetime.now() # 起始时间
logger.info("开始运行时间:%s" % start) # 使用日志记录起始运行时间
self._start_engine()
stop = datetime.now() # 结束时间
logger.info("开始运行时间:%s" % stop) # 使用日志记录结束运行时间
logger.info("耗时:%.2f" % (stop - start).total_seconds()) # 使用日志记录运行耗时
def _start_requests(self):
'''向调度器添加初始请求'''
# 1. 爬虫模块发出初始请求
for spider_name, spider in self.spiders.items():
for start_request in spider.start_requests():
# 2. 把初始请求添加给调度器
# 利用爬虫中间件预处理请求对象
for spider_mid in self.spider_mids:
start_request = spider_mid.process_request(start_request)
start_request.spider_name = spider_name # 为请求对象绑定它所属的爬虫的名称
self.scheduler.add_request(start_request)
def _execute_request_response_item(self):
'''根据请求、发起请求获取响应、解析响应、处理响应结果'''
# 3. 从调度器获取请求对象,交给下载器发起请求,获取一个响应对象
request = self.scheduler.get_request()
if request is none:
return
# 利用下载器中间件预处理请求对象
for downloader_mid in self.downloader_mids:
request = downloader_mid.process_request(request)
# 4. 利用下载器发起请求
response = self.downloader.get_response(request)
# 利用下载器中间件预处理响应对象
for downloader_mid in self.downloader_mids:
response = downloader_mid.process_response(response)
spider = self.spiders[request.spider_name] # 根据请求的spider_name属性,获取对应的爬虫对象
# 5. 利用爬虫的解析响应的方法,处理响应,得到结果
parse = getattr(spider, request.parse) # 获取对应的解析函数
results = parse(response) # parse函数的返回值是一个容器,如列表或者生成器对象
for result in results:
# 6. 判断结果对象
# 6.1 如果是请求对象,那么就再交给调度器
if isinstance(result, request):
# 利用爬虫中间件预处理请求对象
for spider_mid in self.spider_mids:
result = spider_mid.process_request(result)
result.spider_name = request.spider_name # 为请求对象绑定它所属的爬虫的名称
self.scheduler.add_request(result)
# 6.2 否则,就交给管道处理
else:
# 利用爬虫中间件预处理数据对象
for spider_mid in self.spider_mids:
result = spider_mid.process_item(result)
for pipeline in self.pipelines: # 多个管道对象,轮流处理item对象
result = pipeline.process_item(result, spider)
# 统计响应总数
self.total_response_number = 1
def _callback(self, temp):
'''执行新的请求的回调函数,实现循环'''
if self.running is true: # 如果还没满足退出条件,那么继续添加新任务,否则不继续添加,终止回调函数,达到退出循环的目的
self.pool.apply_async(self._execute_request_response_item, callback=self._callback)
def _start_engine(self):
'''依次调用其他组件对外提供的接口,实现整个框架的运作(驱动)'''
self.running = true # 启动引擎,设置状态为true
# 向调度器添加初始请求
self.pool.apply_async(self._start_requests) # 使用异步
self.pool.apply_async(self._execute_request_response_item, callback=self._callback) # 利用回调实现循环
# 设置循环,处理多个请求
while true:
time.sleep(0.0001) # 避免cpu空转,消耗性能
# 根据请求、发起请求获取响应、解析响应、处理响应结果
# self._execute_request_response_item()
# 设置退出条件:当请求数和响应数相等时,退出循环
# 因为异步,需要增加判断,请求数不能为0
if self.total_response_number >= self.scheduler.total_request_number and self.scheduler.total_request_number != 0:
self.running = false # 满足循环退出条件后,设置运行状态为false
break
self.pool.close()
self.pool.join()
实现异步并发控制
在配置文件中设置最大并发数,并在引擎中使用
# scrapy_plus/core/engine.py
class engine(object):
......
def _start_engine(self):
self.running = true
'''依次调用其他组件对外提供的接口,实现整个框架的运作(驱动)'''
# 向调度器添加初始请求
self.pool.apply_async(self._start_requests) # 使用异步
# 控制最大并发数
for i in range(settings.max_async_number):
self.pool.apply_async(self._execute_request_response_item, callback=self._callback) # 利用回调实现循环
while true:
time.sleep(0.0001) # 避免cpu空转,消耗性能
# 设置退出条件:当请求数和响应数相等时,退出循环
# 因为异步,需要增加判断,请求数不能为0
if self.total_response_number >= self.scheduler.total_request_number and self.scheduler.total_request_number != 0:
self.running = false # 满足循环退出条件后,设置运行状态为false
break
self.pool.close()
self.pool.join()
对异步任务进行异常控制,增加异常回调函数error_callback
# scrapy_plus/core/engine.py
class engine(object):
......
def _callback(self, temp):
'''执行新的请求的回调函数,实现循环'''
if self.running is true: # 如果还没满足退出条件,那么继续添加新任务,否则不继续添加,终止回调函数,达到退出循环的目的
self.pool.apply_async(self._execute_request_response_item, callback=self._callback, error_callback=self._error_callback)
def _error_callback(self, exception):
'''异常回调函数'''
try:
raise exception # 抛出异常后,才能被日志进行完整记录下来
except exception as e:
logger.exception(e)
def _start_engine(self):
self.running = true
'''依次调用其他组件对外提供的接口,实现整个框架的运作(驱动)'''
# 向调度器添加初始请求
self.pool.apply_async(self._start_requests, error_callback=self._error_callback) # 使用异步
for i in range(settings.max_async_number):
self.pool.apply_async(self._execute_request_response_item, callback=self._callback, error_callback=self._error_callback) # 利用回调实现循环
while true:
time.sleep(0.0001) # 避免cpu空转,消耗性能
# 设置退出条件:当请求数和响应数相等时,退出循环
# 因为异步,需要增加判断,请求数不能为0
if self.total_response_number >= self.scheduler.total_request_number and self.scheduler.total_request_number != 0:
self.running = false # 满足循环退出条件后,设置运行状态为false
break
self.pool.close()
self.pool.join()