第6章 并发下载

张开发
2026/4/13 16:47:51 15 分钟阅读

分享文章

第6章 并发下载
01 多线程爬虫流程分析多线程爬虫将多线程技术运用在采集网页信息和解析网页内容上。多线程爬虫流程如下首先有一个网址列表是要爬取数据的网页列表;同时启动多个线程抓取网页内容一般启动固定数量的线程。将抓取到的网页源码存储在一个列表里。同时使用多个线程对网页源码列表里的网页内容进行解析。将解析之后的数据存储起来。02 使用queue模块实现多线程爬虫queue(队列)模块简介queue模块是Python内置的标准模块可以直接通过import queue引用在Queue模块中提供了三种同步的、线程安全的队列。Queue、LifoQueue、PriorityQueue其中LifoQueue和PriorityQueue都是Queue的子类。这些队列的唯一区别是元素取出的顺序不同。Queue类Queue类是Python标准库中的线程安全的队列实现它提供了一个适用于多线程编程的先进先出的数据结构—队列用于生产者和消费者线程之间的信息传递。Queue类表示一个基本的FIFOFirst In First Out队列即先进先出。queue.Queuemaxsize0其中maxsize参数是个整数它规定了队列的长度。一旦达到上限再添加数据会导致阻塞直到队列中的数据被消费掉。如果maxsize小于或者等于0表示队列大小没有限制。maxsize的默认值为0。常用方法案例1 Queuefrom queue import Queue queue_object Queue() for i in range(4): queue_object.put(i) while not queue_object.empty(): print(queue_object.get())LifoQueue类表示后进先出队列Last in First Out与栈类似都是后进入的元素先出来。queue.LifoQueuemaxsize0其中maxsize参数的含义与Queue类相同。案例2 LifoQueuefrom queue import LifoQueue lifo_queue LifoQueue() for i in range(4): lifo_queue.put(i) while not lifo_queue.empty(): print(lifo_queue.get())PriorityQueue类表示优先级队列按级别顺序取出元素级别最低的最先取出。优先级队列中的元素一般采取元组优先级别数据的形式来存储。queue.PriorityQueuemaxsize0其中maxsize参数的含义与前两个类相同。案例3 PriorityQueuefrom queue import PriorityQueue class Job(object): def __init__(self, level, description): self.level level self.description description return def __lt__(self, other): return self.level other.level priority_queue PriorityQueue() priority_queue.put(Job(5, 中级别工作)) priority_queue.put(Job(10, 低级别工作)) priority_queue.put(Job(1, 重要工作)) while not priority_queue.empty(): next_job priority_queue.get() print(开始工作:, next_job.description)异常类Empty当从空队列中取数据时 可抛出此异常。Full当向一个满队列中存数据时可抛出此异常。案例4 单线程采集解析from lxml import etree import requests import json local_file open(douban_movie_single.json, a, encodingutf-8) def parse_html(html): text etree.HTML(html) node_list text.xpath(//div[classitem]) for node in node_list: title node.xpath(.//span[classtitle])[0].text image node.xpath(.//div[classpic]//a//img/src)[0] rating_num node.xpath(.//span[classrating_num])[0].text comments node.xpath(.//p[classquote]) if len(comments) 0: comment comments[0].text else: comment item { title: title, image: image, rating_num: rating_num, comment: comment } local_file.write(json.dumps(item, ensure_asciiFalse) \n) headers {User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36, Accept-Language: zh-CN,zh;q0.8} if __name__ __main__: for i in range(1,3): start_num (i-1) * 25 url https://movie.douban.com/top250?start str(start_num) print(url) html requests.get(url, headersheaders).text parse_html(html)案例5 多线程采集解析执行流程主线程启动所有采集线程和解析线程↓等待 pageQueue 为空↓设置 CRAWL_EXIT True (通知采集线程退出)↓【第124行】逐个等待采集线程结束 ← 这里会阻塞↓打印爬取线程已退出↓等待 dataQueue 为空↓设置 PARSE_EXIT True (通知解析线程退出)↓逐个等待解析线程结束↓关闭文件代码from lxml import etree import requests from queue import Queue import json import threading #使用线程库 import time CRAWL_EXIT False ##采集网页页码队列是否为空的信号 class ThreadCrawl(threading.Thread): def __init__(self,threadName,pageQueue,dataQueue): threading.Thread.__init__(self) self.threadName threadName #线程名 self.pageQueue pageQueue #页码队列 self.dataQueue dataQueue #数据队列 #请求报头 self.headers {User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36, Accept-Language: zh-CN,zh;q0.8} def run(self): print(启动 self.threadName) while not CRAWL_EXIT: page self.pageQueue.get(False) num (page - 1) * 25 url https://movie.douban.com/top250?start str(num) content requests.get(url, headersself.headers).text time.sleep(5) self.dataQueue.put(content) print(结束 self.threadName) PARSE_EXIT False #网页源码队列是否为空的信号 class ThreadParse(threading.Thread): def __init__(self,threadName, dataQueue, localFile, lock): super(ThreadParse,self).__init__() # 线程名 self.threadName threadName # 数据队列 self.dataQueue dataQueue # 保存解析后数据的文件名 self.localFile localFile # 互斥锁 self.lock lock def run(self): print(启动 self.threadName) while not PARSE_EXIT: try: html self.dataQueue.get(False) self.parse(html) except Exception as e: pass print(结束 self.threadName) def parse(self,html): text etree.HTML(html) node_list text.xpath(//div[classitem]) for node in node_list: try: title node.xpath(.//span[classtitle])[0].text # 图片链接 image node.xpath(.//div[classpic]//a//img/src)[0] # 评论 comments node.xpath(.//p[classquote]//span) if len(comments) 0: quote comments[0].text else: quote # 评价 rating node.xpath(.//span[classrating_num])[0].text # 构建json格式的字符串 items { title: title, image: image, quote: quote, rating: rating } # 获取互斥锁 with self.lock: # 写入解析后的数据 self.localFile.write(json.dumps(items, ensure_asciiFalse) \n) except Exception as e: print(str(e)) if __name__ __main__: pageQueue Queue(20) for i in range(1, 10): pageQueue.put(i) dataQueue Queue() # 以追加的方式打开本地文件 localFile open(douban_multi.json,a, encodingutf-8) lock threading.Lock() #互斥锁 #三个采集线程的名字 crawlList [采集线程1号,采集线程2号,采集线程3号] #创建、启动和存储三个采集线程 threadCrawls [] for threadName in crawlList: thread ThreadCrawl(threadName, pageQueue, dataQueue) thread.start() threadCrawls.append(thread) # 分析线程分析html # 三个解析线程的名字 parseList [解析线程1号,解析线程2号,解析线程3号] # # 创建、启动和存储三个解析线程 threadParses [] for threadName in parseList: thread ThreadParse(threadName,dataQueue,localFile,lock) thread.start() threadParses.append(thread) while not pageQueue.empty(): pass #如果pageQueue为空采集线程退出循环 CRAWL_EXIT True for thread in threadCrawls: thread.join() #阻塞主线程 print(爬取线程已退出) while not dataQueue.empty(): pass PARSE_EXIT True for thread in threadParses: thread.join() print(解析线程已退出) with lock: # 关闭文件在关闭之前内容都存在内存里 localFile.close()思考队列是线程间最常用的存储共享数据的形式这里有个问题为什么使用队列而不使用Python原生的列表或字典类型呢结论ListDict等数据存储类型都是非线程安全的。在多线程中为了防止共享资源的数据不同步问题对资源加锁是个重要的环节。Queue类中实现了所有的锁逻辑能够满足多线程的需求所以在满足使用条件的情况下建议使用队列。分析在 Python 中列表list在多线程环境下‌不是线程安全‌的。虽然某些操作是原子的比如 append() 和 pop()但大多数列表操作都不是原子操作因此在多个线程同时访问或修改列表时可能会导致数据冲突或错误。为什么列表不是线程安全的非原子操作‌一些常见的列表操作如 l[i] x 或 l[i], l[j] l[j], l[i]交换元素并不是原子操作。这些操作涉及多个步骤例如加载、修改和存储这些步骤可能被线程调度机制打断从而导致数据不一致。字节码示例‌例如swap(i, j) 函数中的交换操作会生成多个字节码指令如果两个线程同时执行这样的操作就可能导致结果错误。原子操作的例子‌相反地像 l.append(x) 和 l.pop() 是原子操作因为它们在底层实现中是不可分割的所以可以被认为是线程安全的。如何保证线程安全为了确保在多线程环境中对列表的安全访问你可以采用以下几种方式使用锁Lock‌通过使用 threading.Lock() 来保护对共享列表的访问。确保所有对列表的读写操作都在锁的保护下进行。使用线程安全的数据结构‌例如queue.Queue 是一个线程安全的队列实现适用于生产者-消费者模式。使用 threading.local()‌如果你希望每个线程拥有自己的列表副本可以使用 threading.local() 来隔离线程本地状态。使用 collections.deque‌虽然 deque 也不是完全线程安全的但在某些情况下它可以提供更好的性能。总结Python 列表在多线程环境下的行为取决于具体的操作。原子操作如 append() 和 pop() 是线程安全的但其他操作则需要额外的同步机制来保证线程安全。为了防止竞态条件和数据不一致建议使用锁或其他同步原语来保护共享资源。举例import threading import time # 共享资源 url_list [fhttp://example.com/page/{i} for i in range(100)] current_index 0 # 全局游标 results [] def fetch_page(): global current_index while True: # 【竞态条件点】 # 线程A和线程B可能同时读取到 current_index 5 idx current_index if idx len(url_list): break # 模拟网络延迟加剧竞态条件 time.sleep(0.01) # 线程A和线程B都更新了 current_index但都使用了 idx5 current_index 1 # 结果page 5 被两个线程同时处理 print(fThread {threading.current_thread().name} fetching {url_list[idx]}。) results.append(url_list[idx]) # 启动多个线程 threads [threading.Thread(targetfetch_page, namefWorker-{i}) for i in range(5)] for t in threads: t.start() for t in threads: t.join() print(fTotal fetched: {len(results)}, Unique: {len(set(results))}) # 输出可能显示 Total Unique说明有重复03 协程实现并发爬取协程爬虫流程分析所谓协程就是同时开启多个任务但一次只顺序执行一个。等到所执行的任务遭遇阻塞就切换到下一个任务继续执行以期节省下阻塞所占用的时间。单进程下,协程和多线程区别并没有很大相较之下协程更节省资源、效率更高、并且更安全。 多进程下多线程可以利用多核资源这是单进程的协程模型做不到的。由于协程的切换不像多线程调度那样耗费资源所以不用严格限制协程的数量。协程爬虫流程如下首先将要爬取的网址存储在一个列表中。为每个网址创建一个协程并启动该协程。将抓取到的目标数据存储在一个列表里。遍历数据列表将数据存储在本地文件中。gevent库简介gevent是一个基于协程的Python网络库是一个第三方库。安装pip install gevent -i https://mirrors.aliyun.com/pypi/simple/引用import gevent常用方法gevent.spawn()方法创建并启动协程。gevent.joinall()方法等待所有协程执行完毕。

更多文章