这里通过代码一步一步的演变,最后完成的是一个精简的Scrapy。在Scrapy内部,基本的流程就是这么实现的。主要是为了能通过学习了解Scrapy大致的流程,对之后再要去看Scrapy的源码也是有帮助的。

Twisted使用

因为Scrapy是基于Twisted实现的,所以先看Twisted怎么用

基本使用

基本使用的示例:

from twisted.web.client import getPage, deferfrom twisted.internet import reactor# 所有任务完成后的回调函数def all_done(arg): """所有爬虫执行完后执行,循环终止""" print("All Done") reactor.stop()# 单个任务的回调函数def callback(contents): """每个爬虫获取到结果后执行""" print(contents)deferred_list = []url_list = [ 'http://www.bing.com', 'http://www.baidu.com', 'http://edu.51cto.com',]for url in url_list: deferred = getPage(bytes(url, encoding='utf-8')) deferred.addCallback(callback) deferred_list.append(deferred)dlist = defer.DeferredList(deferred_list)dlist.addBoth(all_done)if __name__ == '__main__': reactor.run()

在for循环里,创建了对象,还给对象加了回调函数,这是单个任务完成后执行的。此时还没有进行下载,而是把所有的对象加到一个列表里。
之后的defer.DeferredList的调用,才是执行所有的任务。并且又加了一个回调函数all_done,这个是所有任务都完成后才执行的。

基于装饰器1

基于装饰器也可以实现,下面的代码是基于上面的示例做了简单的转换:

from twisted.web.client import getPage, deferfrom twisted.internet import reactordef all_done(arg): print("All Done") reactor.stop()def one_done(response): print(response)@defer.inlineCallbacksdef task(url): deferred = getPage(bytes(url, encoding='utf-8')) deferred.addCallback(one_done) yield deferreddeferred_list = []url_list = [ 'http://www.bing.com', 'http://www.baidu.com', 'http://edu.51cto.com',]for url in url_list: deferred = task(url) deferred_list.append(deferred)dlist = defer.DeferredList(deferred_list)dlist.addBoth(all_done)if __name__ == '__main__': reactor.run()

把原来for循环里的2行代码封装的了一个task函数里,并且加了装饰器。
这个task函数有3个要素:装饰器、deferred对象、通过yield返回返回对象。这个是Twisted里标准的写法。

基于装饰器2

在上面的示例的基础上,把整个for循环都移到task函数里了:

from twisted.web.client import getPage, deferfrom twisted.internet import reactordef all_done(arg): print("All Done") reactor.stop()def one_done(response): print(response)@defer.inlineCallbacksdef task(): for url in url_list: deferred = getPage(bytes(url, encoding='utf-8')) deferred.addCallback(one_done) yield deferredurl_list = [ 'http://www.bing.com', 'http://www.baidu.com', 'http://edu.51cto.com',]ret = task()ret.addBoth(all_done)if __name__ == '__main__': reactor.run()

上面说个的3要素:装饰器、deferred对象、yield都有。

基于装饰器永不退出

在前面的示例中,每完成一个任务,就会返回并执行一个回调函数one_done。所有任务如果都返回了,程序就会退出(退出前会执行回调函数all_done)。
这里所做的,就是添加一个不会返回的任务,这样程序的一直不会退出了:

from twisted.web.client import getPage, deferfrom twisted.internet import reactordef all_done(arg): print("All Done") reactor.stop()def one_done(response): print(response)@defer.inlineCallbacksdef task(): for url in url_list: deferred = getPage(bytes(url, encoding='utf-8')) deferred.addCallback(one_done) yield deferred # 下面的这个任务永远不会完成 stop_deferred = defer.Deferred() # 这是一个空任务,不会去下载,所以永远不会返回 # stop_deferred.callback(None) # 执行这句可以让这个任务返回 stop_deferred.addCallback(lambda s: print(s)) stop_deferred.callback("stop_deferred") yield stop_deferredurl_list = [ 'http://www.bing.com', 'http://www.baidu.com', 'http://edu.51cto.com',]ret = task()ret.addBoth(all_done)if __name__ == '__main__': reactor.run()

这里的做法,就是加了一个额外的任务。要求返回的是Deferred对象,这里就创建了一个空的Deferred对象,并把这个对象返回。
在这里,我们并没有让这个空的Deferred对象去下载,所以也就永远不会有返回。
永不退出的意义
这里目的就是不让程序退出,让这个事件循环一直在那里执行。之后还可以继续往里面添加任务,然后执行新的任务。
程序退出的方法
还是可以让程序退出的。就是调用stop_deferred的callback方法,在上面的代码里注释掉了。执行这个方法,就是强制执行该任务的回调函数。
之前都是等任务执行完返回后,会自动调用callback方法,这里就是强制调用了。
并且由于代码里没有为stop_deferred指定回调函数,所有调用方法后不会执行任何函数。不过调用callback方法必须有一个参数,这里随便写个就好了。
也可以给stop_deferred加一个回调函数,然后再调用callback方法:

stop_deferred.addCallback(lambda s: print(s))stop_deferred.callback("stop_deferred")

Scrapy里的做法
这就是Scrapy里运行完终止的逻辑。第一次只有一个url,执行完就返回了,并且此时应该是所有任务都返回了,那么就会退出程序。
在Scrapy里,也是这样加了一个永远不会返回的任务,不让程序退出。然后之前的结果返回后,又会生成新的任务到调度器,这样就会动态的添加任务继续执行。
要让程序可以退出,这里还需要做一个检测。在下载完成之后的回调函数里,会生成新的任务继续给执行。这里可以执行2个回调函数。
第一个回调函数就是生成新的任务放入调度器,第二个回调函数就是检测等待执行的任务的数量,以及正在执行的任务数量。如果都是0,表示程序可以结束了。
程序结束的方法就是上面的用的调用执行callback方法。

执行完毕后停止事件循环

基于上面的说的,这里的代码实现了全部任务执行完毕后可以调用stop_deferred的callback方法来退出:

from twisted.web.client import getPage, deferfrom twisted.internet import reactortask_list = []stop_deferred = Nonedef all_done(arg): print("All Done") reactor.stop()def one_done(response): print(response)def check_empty(response, *args, **kw): url = kw.get('url') if url in running_list: running_list.remove(url) if not running_list: stop_deferred.callback()@defer.inlineCallbacksdef task(): global running_list, stop_deferred # 全局变量 running_list = url_list.copy() for url in url_list: deferred = getPage(bytes(url, encoding='utf-8')) deferred.addCallback(one_done) deferred.addCallback(check_empty, url=url) yield deferred stop_deferred = defer.Deferred() yield stop_deferredurl_list = [ 'http://www.bing.com', 'http://www.baidu.com', 'http://edu.51cto.com',]ret = task()ret.addBoth(all_done)if __name__ == '__main__': reactor.run()代码优化

上面的代码功能上都实现了,但是实现方法有点不太好。
首先,task函数里分成了两部分,一部分是我们自己调度的任务,一部分是为了不让程序退出,而加的一个空任务。可以把这两部分拆开放在两个函数里。分拆之后,只有第一部分的函数是需要留给用户使用的。下面是把原来的task函数分拆后的代码,并且每个函数也都需要加上装饰器:

from twisted.web.client import getPage, deferfrom twisted.internet import reactortask_list = []stop_deferred = Nonedef all_done(arg): print("All Done") reactor.stop()def one_done(response): print(response)def check_empty(response, url): if url in running_list: running_list.remove(url) if not running_list: stop_deferred.callback()@defer.inlineCallbacksdef open_spider(): global running_list running_list = url_list.copy() for url in url_list: deferred = getPage(bytes(url, encoding='utf-8')) deferred.addCallback(one_done) deferred.addCallback(check_empty, url) yield deferred@defer.inlineCallbacksdef stop(): global stop_deferred stop_deferred = defer.Deferred() yield stop_deferred@defer.inlineCallbacksdef task(): yield open_spider() yield stop()url_list = [ 'http://www.bing.com', 'http://www.baidu.com', 'http://edu.51cto.com',]ret = task()ret.addBoth(all_done)if __name__ == '__main__': reactor.run()

另外还有全局变量的问题,这里的代码使用了全部变量,这不是一个好的做法。再改下去需要引入class了。

模拟Scrapy

从这里开始,就要使用面向对象的方法,进一步进行封装了。

封装部分

先把之前主要的代码封装到类里:

from twisted.web.client import getPage, deferfrom twisted.internet import reactorimport queueclass Request(object): """封装请求的url和回调函数""" def __init__(self, url, callback): self.url = url self.callback = callbackclass Scheduler(object): """调度器""" def __init__(self, engine): self.engine = engine self.q = queue.Queue() def enqueue_request(self, request): """添加任务""" self.q.put(request) def next_request(self): """获取下一个任务""" try: req = self.q.get(block=False) except queue.Empty: req = None return req def size(self): return self.q.qsize()class ExecutionEngine(object): """引擎""" def __init__(self): self._close_wait = None # stop_deferred self.start_requests = None self.scheduler = Scheduler(self) self.in_progress = set() # 正在执行中的任务 def _next_request(self): while self.start_requests: request = next(self.start_requests, None) if request: self.scheduler.enqueue_request(request) else: self.start_requests = None while len(self.in_progress) < 5 and self.scheduler.size() > 0: # 最大编发为5 request = self.scheduler.next_request() if not request: break self.in_progress.add(request) d = getPage(bytes(request.url, encoding='utf-8')) # addCallback是正确返回的时候执行,还有addErrback是返回有错误的时候执行 # addBoth就是上面两种情况返回都会执行 d.addBoth(self._handle_downloader_output, request) d.addBoth(lambda x, req: self.in_progress.remove(req), request) d.addBoth(lambda x: self._next_request()) if len(self.in_progress) == 0 and self.scheduler.size() == 0: self._close_wait.callback(None) def _handle_downloader_output(self, response, request): import types gen = request.callback(response) if isinstance(gen, types.GeneratorType): # 是否为生成器类型 for req in gen: # 这里还可以再加判断,如果是request对象则继续爬取 # 如果是item对象,则可以交给pipline self.scheduler.enqueue_request(req) @defer.inlineCallbacks def open_spider(self, start_requests): self.start_requests = start_requests yield None reactor.callLater(0, self._next_request) # 过多少秒之后,执行后面的函数 @defer.inlineCallbacks def start(self): """原来的stop函数""" self._close_wait = defer.Deferred() yield self._close_wait@defer.inlineCallbacksdef crawl(start_requests): """原来的task函数""" engine = ExecutionEngine() start_requests = iter(start_requests) yield engine.open_spider(start_requests) yield engine.start()def all_done(arg): print("All Done") reactor.stop()def one_done(response): print(response)count = 0def chouti(response): """任务返回后生成新的Request继续交给调度器执行""" global count count += 1 print(response) if count > 3: return None for i in range(10): yield Request("http://dig.chouti.com/all/hot/recent/%s" % i, lambda x: print(len(x)))if __name__ == '__main__': url_list = [ 'http://www.bing.com', 'https://www.baidu.com', 'http://edu.51cto.com', ] requests = [Request(url, callback=one_done) for url in url_list] # requests = [Request(url, callback=chouti) for url in url_list] ret = crawl(requests) ret.addBoth(all_done) reactor.run()

这里还写了一个回调函数chouti,可以在爬虫返回后,生成新的Request继续爬取。为了控制这个回调函数的调用,又加了一个全局变量。
接下来会对这部分函数继续封装,把所有的代码都封装到类里。
闭包解决全局变量
这里的部分是我自己尝试的思考。
其实还可以通过闭包的方法。通过闭包来保存函数的状态,而不使用全局变量:

def chouti2(): n = 0 def func(response): print(response) nonlocal n n += 1 if n > 3: return None for i in range(10): yield Request("http://dig.chouti.com/all/hot/recent/%s" % i, lambda x: print(len(x))) return funcif __name__ == '__main__': url_list = [ 'http://www.bing.com', 'https://www.baidu.com', 'http://edu.51cto.com', ] # requests = [Request(url, callback=one_done) for url in url_list] # requests = [Request(url, callback=chouti) for url in url_list] callback = chouti2() requests = [Request(url, callback=callback) for url in url_list] ret = crawl(requests) ret.addBoth(all_done) reactor.run()完全封装

上面的示例还有几个函数,继续把剩下的函数也封装到类里。下面的这个就是TinyScrapy:

from twisted.web.client import getPage, deferfrom twisted.internet import reactorimport queueclass Request(object): """封装请求的url和回调函数""" def __init__(self, url, callback=None): self.url = url self.callback = callback # 默认是None,则会去调用Spider对象的parse方法class Scheduler(object): """调度器""" def __init__(self, engine): self.engine = engine self.q = queue.Queue() def enqueue_request(self, request): """添加任务""" self.q.put(request) def next_request(self): """获取下一个任务""" try: req = self.q.get(block=False) except queue.Empty: req = None return req def size(self): return self.q.qsize()class ExecutionEngine(object): """引擎""" def __init__(self): self._close_wait = None # stop_deferred self.start_requests = None self.scheduler = Scheduler(self) self.in_progress = set() # 正在执行中的任务 self.spider = None # 在open_spider方法里添加 def _next_request(self): while self.start_requests: request = next(self.start_requests, None) if request: self.scheduler.enqueue_request(request) else: self.start_requests = None while len(self.in_progress) < 5 and self.scheduler.size() > 0: # 最大编发为5 request = self.scheduler.next_request() if not request: break self.in_progress.add(request) d = getPage(bytes(request.url, encoding='utf-8')) # addCallback是正确返回的时候执行,还有addErrback是返回有错误的时候执行 # addBoth就是上面两种情况返回都会执行 d.addBoth(self._handle_downloader_output, request) d.addBoth(lambda x, req: self.in_progress.remove(req), request) d.addBoth(lambda x: self._next_request()) if len(self.in_progress) == 0 and self.scheduler.size() == 0: self._close_wait.callback(None) # 这个方法和之前的有一点小的变化,主要是用到了新定义的Response对象 def _handle_downloader_output(self, body, request): import types response = Response(body, request) # 如果没有指定callback就调用Spider类的parse方法 func = request.callback or self.spider.parse gen = func(response) if isinstance(gen, types.GeneratorType): # 是否为生成器类型 for req in gen: # 这里还可以再加判断,如果是request对象则继续爬取 # 如果是item对象,则可以交给pipline self.scheduler.enqueue_request(req) @defer.inlineCallbacks def open_spider(self, spider, start_requests): self.start_requests = start_requests self.spider = spider # 加了这句 yield None reactor.callLater(0, self._next_request) # 过多少秒之后,执行后面的函数 @defer.inlineCallbacks def start(self): """原来的stop函数""" self._close_wait = defer.Deferred() yield self._close_waitclass Response(object): def __init__(self, body, request): self.body = body self.request = request self.url = request.url @property def text(self): return self.body.decode('utf-8')class Crawler(object): def __init__(self, spider_cls): self.spider_cls = spider_cls self.spider = None self.engine = None @defer.inlineCallbacks def crawl(self): self.engine = ExecutionEngine() self.spider = self.spider_cls() start_requests = iter(self.spider.start_requests()) yield self.engine.open_spider(self.spider, start_requests) yield self.engine.start()class CrawlerProcess(object): def __init__(self): self._active = set() self.crawlers = set() def crawl(self, spider_cls, *args, **kwargs): crawler = Crawler(spider_cls) self.crawlers.add(crawler) d = crawler.crawl(*args, **kwargs) self._active.add(d) return d def start(self): dl = defer.DeferredList(self._active) dl.addBoth(self._stop_reactor) reactor.run() @classmethod def _stop_reactor(cls, _=None): """原来的all_done函数 之前的示例中,这个函数都是要接收一个参数的。 虽然不用,但是调用的模块一定会传过来,所以一定要接收一下。 这里就用了占位符来接收这个参数,并且设置了默认值None。 """ print("All Done") reactor.stop()class Spider(object): def __init__(self): if not hasattr(self, 'start_urls'): self.start_urls = [] def start_requests(self): for url in self.start_urls: yield Request(url) def parse(self, response): print(response.body)class ChoutiSpider(Spider): name = "chouti" start_urls = ["http://dig.chouti.com"] def parse(self, response): print(next((s for s in response.text.split('\n') if "<title>" in s)))class BingSpider(Spider): name = "bing" start_urls = ["http://www.bing.com"]class BaiduSpider(Spider): name = "baidu" start_urls = ["http://www.baidu.com"]if __name__ == '__main__': spider_cls_list = [ChoutiSpider, BingSpider, BaiduSpider] crawler_process = CrawlerProcess() for spider_cls in spider_cls_list: crawler_process.crawl(spider_cls) crawler_process.start()

这里用的类名、方法名、部分代码都是和Scrapy的源码里一样的。相当于把Scrapy精简了,把其中的核心都提取出来了。如果能看明白这部分代码,再去Scrapy里看源码应该能相对容易一些了。