多线程介绍
多线程时为了同步完成多项任务,通过提高资源使用效率来提高系统的效率。线程是在同一时间需要完成多项任务的时候实现的。就好比多线程是火车的每一节车厢,而进程就是火车,车厢离开火车无法启动。多线程的出现是为了提高效率。
threading模块介绍
threading模块是python中专门提供用来多线程编程的模块。threading模块中最常用的类是Thread。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import threading import time
def coding(): for x in range(3): print(f"{x} 正在写代码") time.sleep(1)
def drawing(): for x in range(3): print(f"{x} 正在画图") time.sleep(1)
def multi_thread(): t1 = threading.Thread(target=coding) t2 = threading.Thread(target=drawing)
t1.start() t2.start() if __name__ == "__main__": multi_thread()
|
查看线程数
使用threading.enumerate()函数可以看到当前线程的数量
查看当前线程的名字
使用threading.current_thread()函数可以看到当前线程的信息。
继承自threading.Thread类
为了让线程代码得到更好的封装,可以使用到threading模块下的Thread类,继承自Thread类后,然后实现run方法,线程代码会自动运行run方法中的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import threading import time
class CodingThread(threading.Thread): def run(self): for x in range(5): print(f'{threading.current_thread()} 正在写代码') time.sleep(1)
class DrawingThread(threading.Thread): def run(self): for x in range(3): print(f'{threading.current_thread()} 正在画图') time.sleep(1)
def multi_thread(): t1 = CodingThread() t2 = DrawingThread()
t1.start() t2.start() if __name__ == "__main__": multi_thread()
|
多线程共享全局变量的问题
多线程都是在同一个进程中运行的,因此在进程中的全局变量多有线程都是可共享的。由于线程执行的顺序是无序的,线程如果同时执行,有可能会造成数据错误。例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import threading
tickets = 0
def get_ticket(): global tickets for x in range(1000000): tickets += 1 print(f"tickets: {tickets}")
def main(): for x in range(2): t = threading.Thread(target=get_ticket) t.start()
if __name__ == "__main__": main()
|
锁机制
为了解决以上使用共享全局变量的问题,threading提供了一个Lock类,这个类可以在某个线程访问某个变量的时候加锁,其他线程此时无法使用,直到当前线程处理完成后,把锁释放,其他线程才能使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import threading
tickets = 0 lock = threading.Lock()
def get_ticket(): global tickets lock.acquire() for x in range(1000000): tickets += 1 lock.release() print(f"tickets: {tickets}")
def main(): for x in range(2): t = threading.Thread(target=get_ticket) t.start()
if __name__ == "__main__": main()
|
_注意_:lock锁加在修改全局变量的位置,如果只是访问全局变脸没有必要加锁。
Lock生产者和消费者模式
生产者和消费者模式是多线程开发中经常看到的一种模式。生产者的线程专门用来生产一些数据,然后存放在一个中间变量中。消费者再从这个中间变量中取出数据进行消费。但是因为要使用中间变量,中间变量由于经常是些全局变量,因此在使用的时候需要使用Lock锁来保证数据完整性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| import threading import random import time
gMoney = 1000 gLock = threading.Lock()
gTimes = 0
class Producer(threading.Thread): def run(self): global gMoney global gTimes while True: money = random.randint(100, 1000) gLock.acquire() if gTimes >= 10: gLock.release() break gMoney += money print(f"{threading.current_thread()} 当前存入 {money} 元,剩余 {gMoney} 元") gTimes += 1 time.sleep(1) gLock.release()
class Consumer(threading.Thread): def run(self): global gMoney global gTimes while True: money = random.randint(100, 500) gLock.acquire() if gMoney > money: gMoney -= money print(f"{threading.current_thread()} 取出 {money} 元,剩余 {gMoney} 元") time.sleep(1) else: if gMoney >= 10: gLock.release() break print(f"{threading.current_thread()} 想取出 {money} 元,剩余 {gMoney} 元,不足!") gLock.release()
def main(): for x in range(5): Consumer(name=f"消费者线程{x}").start()
for x in range(5): Producer(name=f"生产者线程{x}").start()
if __name__ == "__main__": main()
|
Condition生产者和消费者模式
Lock中的生产者与消费者模式可以正常运行,但是存在一个问题,在消费者中,总是通过while死循环上锁的方式判断money的多少。由于上锁是一个很消耗CPU资源的行为,因此此方式不是最好的。还有一种更好的方式是使用threading.Condition来实现。threading.Condition可以在没有数据的时候处于阻塞等待状态。一旦有合适的数据,还可以使用notify相关的函数来通知其他处于等待状态的线程。这样就可以不用做一些无用的上锁和解锁的操作,还可以提升程序的性能。threading.Condition类似threading.Lock,可以在修改全局数据的时候进行加锁,也可以在修改完毕后解锁。以下对部分常用函数进行简单介绍:
- acquire:上锁
- release:解锁
- wait:将当前线程处于等待状态,并且释放锁。可以被其他线程使用notify和notify_all函数进行唤醒。被唤醒后会继续等待上锁,上锁后继续执行下面的代码。
- notify:通知某个正在等待的线程,默认是第一个等待的线程。
- notify_all:通知所有正在等待的线程。notify和notify_all不会释放锁。并且需要在release之前调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| import threading import random import time
gMoney = 1000 gCondition = threading.Condition()
gTimes = 0 gTotalTimes = 5
class Producer(threading.Thread): def run(self): global gMoney global gCondition global gTimes while True: money = random.randint(100, 1000) gCondition.acquire() if gTimes >= gTotalTimes: gCondition.release() print(f"当前生产者总共生产了{gTimes}次") break gMoney += money print(f"{threading.current_thread()} 当前存入 {money} 元,剩余 {gMoney} 元") gTimes += 1 time.sleep(1) gCondition.notify_all() gCondition.release()
class Consumer(threading.Thread): def run(self): global gMoney global gCondition global gTimes while True: money = random.randint(100, 500) gCondition.acquire() while gMoney < money: if gTimes >= gTotalTimes: gCondition.release() return print(f"{threading.current_thread()} 想取出 {money} 元,剩余 {gMoney} 元,不足!") gCondition.wait()
gMoney -= money print(f"{threading.current_thread()} 取出 {money} 元,剩余 {gMoney} 元") time.sleep(1) gCondition.release()
def main(): for x in range(5): Consumer(name=f"消费者线程{x}").start()
for x in range(5): Producer(name=f"生产者线程{x}").start()
if __name__ == "__main__": main()
|
Queue线程安全队列
在线程中,访问一些全局变量,加锁是一个经常的过程。如果你想把一些数据存储到某个队列中,可以使用Python内置的Queue模块。Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先进先出)队列、LIFO(后进先出)队列。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列实现线程间的同步。相关函数:
- 初始化Queue(maxsize):创建一个先进先出的队列
- qsize():返回队列的大小
- empty():判断队列是否为空
- full():判断队列是否满了
- get():从队列中取最后一个数据
- put():将一个数据放到队列中
使用生产者消费者模式多线程下载表情包
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
import threading import requests import os import re from lxml import etree import util from queue import Queue
class Producer(threading.Thread): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36', }
def __init__(self, page_queue, img_queue, *args, **kwargs): super(Producer, self).__init__(*args, **kwargs) self.page_queue = page_queue self.img_queue = img_queue
def run(self): while True: if self.page_queue.empty(): break url = self.page_queue.get() self.parse_page(url)
def parse_page(self, url): response = requests.get(url, headers=self.headers) text = response.text html = etree.HTML(text) imgs = html.xpath('//a[@class="col-xs-6 col-sm-3"]/img') for img in imgs: if img.get('class') == 'gif': continue img_url = img.get('data-original') suffix = os.path.splitext(img_url)[1].replace('.null', '.jpg') alt = img.get('alt') alt = re.sub(r'[,。?!!?,/\\.]', '', alt) img_name = alt + suffix self.img_queue.put((img_url, img_name))
class Consumer(threading.Thread): headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36', }
def __init__(self, page_queue, img_queue, *args, **kwargs): super(Consumer, self).__init__(*args, **kwargs) self.page_queue = page_queue self.img_queue = img_queue
def run(self): while True: if self.img_queue.empty(): if self.page_queue.empty(): return img = self.img_queue.get(block=True) url, filename = img if not os.path.exists('images'): os.makedirs('images/') result = util.retrieve(url, headers=self.headers, path='images/'+filename) if result: print(f'{filename} 下载完成!') else: print(f'{filename} 下载失败!')
def main(): page_queue = Queue(100) img_queue = Queue(500) for x in range(1, 10): url = f'http://www.doutula.com/photo/list/?page={x}' page_queue.put(url)
for x in range(5): t = Producer(page_queue, img_queue) t.start()
for x in range(5): t = Consumer(page_queue, img_queue) t.start()
if __name__ == '__main__': main()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
from urllib import parse, request
def retrieve(url, headers=None, path=None): if not isinstance(headers, dict): return False
opener = request.build_opener() opener.addheaders = [result for result in zip(headers.keys(), headers.values())] request.install_opener(opener) request.urlretrieve(url, path)
return True
def urlopen(url, headers=None, data=None,origin_req_host=None, unverifiable=False, method=None): req = request.Request(url, headers=headers, data=data, origin_req_host=origin_req_host, unverifiable=unverifiable, method=method) resp = request.urlopen(req) return resp
|
GIL全局解释器锁
Python自带的解释器是CPython。CPython解释器的多线程实际上是一个伪多线程(在多核CPU中,只能利用一核,无法利用多核的优势)。同一时刻只有一个线程执行,为了保证同一时刻只有一个线程在执行,在CPython解释器中便引入了GIL(Global Intepreter Lock),叫做全局解释器锁。因为在CPython解释器的内存管理不是线程安全的,所以还有其他的解释器。
Ipython
Ipython是基于CPython之上一个交互解释器,IPython只是在交互方式上有所增强,但是执行Python代码的功能和CPython是完全一样的。
PyPy
PyPy是另一个Python解释器,它的目标是执行速度,PyPy采用JIT技术,对Python代码进行动态编译,所以可以显著提高Python代码的执行速度。在PyPy中,同样也是存在GIL锁的。
Jython
Jython是运行在Java平台上的Python解释器,可以直接把Python代码编译成Java字节码执行。不存在GIL锁。
IronPython
IronPython和Jython类似,只不过IronPython是运行在微软.Net平台上的Python解释器,可以直接把Python代码编译成.Net的字节码。不存在GIL锁。
GIL虽然是一个假的多线程,但是在处理一些IO操作(比如文件读写和网络请求)还是可以在很大程度上提高效率的。在IO操作上建议使用多线程提高效率。在一些CPU计算操作上不建议使用多线程,而建议使用多进程。