多线程爬虫

多线程介绍

多线程时为了同步完成多项任务,通过提高资源使用效率来提高系统的效率。线程是在同一时间需要完成多项任务的时候实现的。就好比多线程是火车的每一节车厢,而进程就是火车,车厢离开火车无法启动。多线程的出现是为了提高效率。

threading模块介绍

threading模块是python中专门提供用来多线程编程的模块。threading模块中最常用的类是Thread。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time

def coding():
for x in range(3):
print(f"{x} 正在写代码")
time.sleep(1)

def drawing():
for x in range(3):
print(f"{x} 正在画图")
time.sleep(1)

def multi_thread():
t1 = threading.Thread(target=coding)
t2 = threading.Thread(target=drawing)

t1.start()
t2.start()

if __name__ == "__main__":
multi_thread()

查看线程数

使用threading.enumerate()函数可以看到当前线程的数量

查看当前线程的名字

使用threading.current_thread()函数可以看到当前线程的信息。

继承自threading.Thread类

为了让线程代码得到更好的封装,可以使用到threading模块下的Thread类,继承自Thread类后,然后实现run方法,线程代码会自动运行run方法中的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import threading
import time

class CodingThread(threading.Thread):
def run(self):
for x in range(5):
print(f'{threading.current_thread()} 正在写代码')
time.sleep(1)

class DrawingThread(threading.Thread):
def run(self):
for x in range(3):
print(f'{threading.current_thread()} 正在画图')
time.sleep(1)

def multi_thread():
t1 = CodingThread()
t2 = DrawingThread()

t1.start()
t2.start()

if __name__ == "__main__":
multi_thread()

多线程共享全局变量的问题

多线程都是在同一个进程中运行的,因此在进程中的全局变量多有线程都是可共享的。由于线程执行的顺序是无序的,线程如果同时执行,有可能会造成数据错误。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import threading

tickets = 0

def get_ticket():
global tickets
for x in range(1000000):
tickets += 1
print(f"tickets: {tickets}")

def main():
for x in range(2):
t = threading.Thread(target=get_ticket)
t.start()

if __name__ == "__main__":
main()

锁机制

为了解决以上使用共享全局变量的问题,threading提供了一个Lock类,这个类可以在某个线程访问某个变量的时候加锁,其他线程此时无法使用,直到当前线程处理完成后,把锁释放,其他线程才能使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading

tickets = 0
lock = threading.Lock()

def get_ticket():
global tickets
lock.acquire()
for x in range(1000000):
tickets += 1
lock.release()
print(f"tickets: {tickets}")

def main():
for x in range(2):
t = threading.Thread(target=get_ticket)
t.start()

if __name__ == "__main__":
main()

_注意_:lock锁加在修改全局变量的位置,如果只是访问全局变脸没有必要加锁。

Lock生产者和消费者模式

生产者和消费者模式是多线程开发中经常看到的一种模式。生产者的线程专门用来生产一些数据,然后存放在一个中间变量中。消费者再从这个中间变量中取出数据进行消费。但是因为要使用中间变量,中间变量由于经常是些全局变量,因此在使用的时候需要使用Lock锁来保证数据完整性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import threading
import random
import time

gMoney = 1000
gLock = threading.Lock()
# 记录生产者生产次数,在达到10次时停止生产
gTimes = 0

class Producer(threading.Thread):
def run(self):
global gMoney
global gTimes
while True:
money = random.randint(100, 1000)
gLock.acquire()
if gTimes >= 10:
gLock.release()
break
gMoney += money
print(f"{threading.current_thread()} 当前存入 {money} 元,剩余 {gMoney} 元")
gTimes += 1
time.sleep(1)
gLock.release()

class Consumer(threading.Thread):
def run(self):
global gMoney
global gTimes
while True:
money = random.randint(100, 500)
gLock.acquire()
if gMoney > money:
gMoney -= money
print(f"{threading.current_thread()} 取出 {money} 元,剩余 {gMoney} 元")
time.sleep(1)
else:
if gMoney >= 10:
gLock.release()
break
print(f"{threading.current_thread()} 想取出 {money} 元,剩余 {gMoney} 元,不足!")
gLock.release()

def main():
for x in range(5):
Consumer(name=f"消费者线程{x}").start()

for x in range(5):
Producer(name=f"生产者线程{x}").start()

if __name__ == "__main__":
main()

Condition生产者和消费者模式

Lock中的生产者与消费者模式可以正常运行,但是存在一个问题,在消费者中,总是通过while死循环上锁的方式判断money的多少。由于上锁是一个很消耗CPU资源的行为,因此此方式不是最好的。还有一种更好的方式是使用threading.Condition来实现。threading.Condition可以在没有数据的时候处于阻塞等待状态。一旦有合适的数据,还可以使用notify相关的函数来通知其他处于等待状态的线程。这样就可以不用做一些无用的上锁和解锁的操作,还可以提升程序的性能。threading.Condition类似threading.Lock,可以在修改全局数据的时候进行加锁,也可以在修改完毕后解锁。以下对部分常用函数进行简单介绍:

  1. acquire:上锁
  2. release:解锁
  3. wait:将当前线程处于等待状态,并且释放锁。可以被其他线程使用notify和notify_all函数进行唤醒。被唤醒后会继续等待上锁,上锁后继续执行下面的代码。
  4. notify:通知某个正在等待的线程,默认是第一个等待的线程。
  5. notify_all:通知所有正在等待的线程。notify和notify_all不会释放锁。并且需要在release之前调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import threading
import random
import time

gMoney = 1000
gCondition = threading.Condition()
# 记录生产者生产次数,在达到10次时停止生产
gTimes = 0
gTotalTimes = 5

class Producer(threading.Thread):
def run(self):
global gMoney
global gCondition
global gTimes
while True:
money = random.randint(100, 1000)
gCondition.acquire()
if gTimes >= gTotalTimes:
gCondition.release()
print(f"当前生产者总共生产了{gTimes}次")
break
gMoney += money
print(f"{threading.current_thread()} 当前存入 {money} 元,剩余 {gMoney} 元")
gTimes += 1
time.sleep(1)
gCondition.notify_all()
gCondition.release()

class Consumer(threading.Thread):
def run(self):
global gMoney
global gCondition
global gTimes
while True:
money = random.randint(100, 500)
gCondition.acquire()
while gMoney < money:
if gTimes >= gTotalTimes:
gCondition.release()
return
print(f"{threading.current_thread()} 想取出 {money} 元,剩余 {gMoney} 元,不足!")
gCondition.wait()

gMoney -= money
print(f"{threading.current_thread()} 取出 {money} 元,剩余 {gMoney} 元")
time.sleep(1)
gCondition.release()

def main():
for x in range(5):
Consumer(name=f"消费者线程{x}").start()

for x in range(5):
Producer(name=f"生产者线程{x}").start()

if __name__ == "__main__":
main()

Queue线程安全队列

在线程中,访问一些全局变量,加锁是一个经常的过程。如果你想把一些数据存储到某个队列中,可以使用Python内置的Queue模块。Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先进先出)队列、LIFO(后进先出)队列。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列实现线程间的同步。相关函数:

  1. 初始化Queue(maxsize):创建一个先进先出的队列
  2. qsize():返回队列的大小
  3. empty():判断队列是否为空
  4. full():判断队列是否满了
  5. get():从队列中取最后一个数据
  6. put():将一个数据放到队列中

使用生产者消费者模式多线程下载表情包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
## demo.py

import threading
import requests
import os
import re
from lxml import etree
import util
from queue import Queue

class Producer(threading.Thread):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36',
}

def __init__(self, page_queue, img_queue, *args, **kwargs):
super(Producer, self).__init__(*args, **kwargs)
self.page_queue = page_queue
self.img_queue = img_queue

def run(self):
while True:
if self.page_queue.empty():
break
url = self.page_queue.get()
self.parse_page(url)

def parse_page(self, url):
response = requests.get(url, headers=self.headers)
text = response.text
html = etree.HTML(text)
imgs = html.xpath('//a[@class="col-xs-6 col-sm-3"]/img')
for img in imgs:
if img.get('class') == 'gif':
continue
img_url = img.get('data-original')
suffix = os.path.splitext(img_url)[1].replace('.null', '.jpg')
alt = img.get('alt')
alt = re.sub(r'[,。?!!?,/\\.]', '', alt)
img_name = alt + suffix
self.img_queue.put((img_url, img_name))


class Consumer(threading.Thread):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36',
}

def __init__(self, page_queue, img_queue, *args, **kwargs):
super(Consumer, self).__init__(*args, **kwargs)
self.page_queue = page_queue
self.img_queue = img_queue

def run(self):
while True:
if self.img_queue.empty():
if self.page_queue.empty():
return
img = self.img_queue.get(block=True)
url, filename = img
if not os.path.exists('images'):
os.makedirs('images/')
result = util.retrieve(url, headers=self.headers, path='images/'+filename)
if result:
print(f'{filename} 下载完成!')
else:
print(f'{filename} 下载失败!')

def main():
page_queue = Queue(100)
img_queue = Queue(500)
for x in range(1, 10):
url = f'http://www.doutula.com/photo/list/?page={x}'
page_queue.put(url)

for x in range(5):
t = Producer(page_queue, img_queue)
t.start()

for x in range(5):
t = Consumer(page_queue, img_queue)
t.start()

if __name__ == '__main__':
main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# util.py

from urllib import parse, request

def retrieve(url, headers=None, path=None):
if not isinstance(headers, dict):
return False

opener = request.build_opener()
opener.addheaders = [result for result in zip(headers.keys(), headers.values())]
request.install_opener(opener)
request.urlretrieve(url, path)

return True

def urlopen(url, headers=None, data=None,origin_req_host=None, unverifiable=False,
method=None):
req = request.Request(url, headers=headers, data=data, origin_req_host=origin_req_host,
unverifiable=unverifiable, method=method)
resp = request.urlopen(req)
return resp

GIL全局解释器锁

Python自带的解释器是CPython。CPython解释器的多线程实际上是一个伪多线程(在多核CPU中,只能利用一核,无法利用多核的优势)。同一时刻只有一个线程执行,为了保证同一时刻只有一个线程在执行,在CPython解释器中便引入了GIL(Global Intepreter Lock),叫做全局解释器锁。因为在CPython解释器的内存管理不是线程安全的,所以还有其他的解释器。

  • Ipython

    Ipython是基于CPython之上一个交互解释器,IPython只是在交互方式上有所增强,但是执行Python代码的功能和CPython是完全一样的。

  • PyPy

    PyPy是另一个Python解释器,它的目标是执行速度,PyPy采用JIT技术,对Python代码进行动态编译,所以可以显著提高Python代码的执行速度。在PyPy中,同样也是存在GIL锁的。

  • Jython

    Jython是运行在Java平台上的Python解释器,可以直接把Python代码编译成Java字节码执行。不存在GIL锁。

  • IronPython

    IronPython和Jython类似,只不过IronPython是运行在微软.Net平台上的Python解释器,可以直接把Python代码编译成.Net的字节码。不存在GIL锁。

GIL虽然是一个假的多线程,但是在处理一些IO操作(比如文件读写和网络请求)还是可以在很大程度上提高效率的。在IO操作上建议使用多线程提高效率。在一些CPU计算操作上不建议使用多线程,而建议使用多进程。