It has been 589 days since the last update, the content of the article may be outdated.

程序实例

python
3.8.6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import threading
import queue
import time

my_queue = queue.Queue()


def worker():
while True:
item = my_queue.get()
if item is None:
break
# 处理任务...
print(f"Processing: {item}, now queue_size:{my_queue.qsize()}")
# time.sleep(0.1)
my_queue.task_done()


t1 = time.time()


# 创建并启动多个工作线程
for _ in range(3):
threading.Thread(target=worker).start()

# 向队列中添加任务
for i in range(9):
my_queue.put(i)

print(f"work over")

# 等待所有任务完成
my_queue.join()

# 停止工作线程
for _ in range(3):
my_queue.put(None)

t2 = time.time()

print(f"Both threads have finished. total time: {t2-t1}")

输出结果

plaintext
1
2
3
4
5
6
7
8
9
10
11
work overProcessing: 0, now queue_size:8
Processing: 1, now queue_size:7

Processing: 2, now queue_size:6
Processing: 3, now queue_size:5
Processing: 4, now queue_size:4
Processing: 5, now queue_size:3
Processing: 6, now queue_size:2
Processing: 7, now queue_size:1
Processing: 8, now queue_size:0
Both threads have finished. total time: 0.0009996891021728516

程序分析

threading.Thread 创建线程任务

threading.Thread(target=worker).start()用于启动线程任务,在本程序中借助for循环创建了3个线程任务。

queue 进行线程信息获取

类实例my_queue通过put和get方法实现传递信息,qsize可用于检查当前队列长度,task_done用于给当前在队列中的第一个标定为完成,如队列有多个值,需多次调用方法注意确认完毕才可,注意没有task_done执行qsize也会随着get方法调用减少。join方法在队列信息没有全部完成时会对主线程进行阻塞。

threading.Thread也有join阻塞方法,但在该程序中已有堵塞线程的适合的方法可不用。

以下为一个关于queue类的小程序

py
1
2
3
4
5
6
7
8
9
10
11
12
13
import queue

queue = queue.Queue()
for i in range(5):
queue.put(1)
for i in range(5):
c = queue.get()
for i in range(5):
if i != -1:
queue.task_done()

queue.join()
print(123)
plaintext
1
123

实际上,在稍微修改代码 if i != -1:if i: 后程序便始终都在堵塞,因为task_done并没有全队列得到确认。

程序执行顺序

你会发现在输出中 work over 最先输出,可能是线程调用函数的速度慢于print。等join阻塞解除后在执行print语句。

说到线程的共享资源,自然就离不开锁的概念。

来一段简单的程序实例

py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import threading

# 创建一个线程锁
my_lock = threading.Lock()

# 共享变量
counter = 0


def increment():
global counter
for _ in range(1000000):
# with my_lock:
# temp = counter
# temp += 1
# counter = temp
temp = counter
temp += 1
counter = temp


# 创建并启动多个线程
threads = []
for _ in range(4):
thread = threading.Thread(target=increment)
threads.append(thread)
thread.start()

# 等待所有线程完成
for thread in threads:
thread.join()

print("Counter value:", counter)

plaintext
1
Counter value: 3050932

解释

结果是不是与理论不符?因为多线程的多次并发执行容易产生对公共的资源竞争,对此,我们需要利用锁这一工具来规划线程资源使用。在这里我们使用了线程锁。

线程锁(Lock):
线程锁是最简单的锁,也被称为互斥锁(Mutex)。它在任意时刻只允许一个线程持有锁,其他线
程在尝试获得锁时会被阻塞。当持有锁的线程释放锁后,其他线程才能获得锁并继续执行。线程锁
适用于简单的同步需求,但要注意使用不当可能导致死锁。

在使用注释里的写法后你会发现输出结果为4000000,但执行时间会便得慢很多,从线程锁的定义来看不难理解。

死锁

锁具有很高的权限,能够阻塞其他线程对该资源的获取,但假如有以下情况,锁倒成了“罪魁祸首”。

假设有两把锁LK,A线程获取了L,B线程获取了K,在AB没释放锁的情况下AB又分别要获取锁KL。因为A没获得K,所以A还在阻塞等待K的释放,同理B阻塞等待L的释放。这两个线程在相互作用下无响应了。这种情况称之为死锁。

来段代码展示下

py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import threading
import time

# 创建两个锁
lock1 = threading.Lock()
lock2 = threading.Lock()


def function1():
with lock1:
print("Function 1 holding lock 1...")
# 等待一段时间,模拟执行耗时操作
time.sleep(1)
print("Function 1 waiting for lock 2...")
with lock2:
print("Function 1 holding both locks!")


def function2():
with lock2:
print("Function 2 holding lock 2...")
# 等待一段时间,模拟执行耗时操作
time.sleep(1)
print("Function 2 waiting for lock 1...")
with lock1:
print("Function 2 holding both locks!")


# 创建并启动两个线程
thread1 = threading.Thread(target=function1)
thread2 = threading.Thread(target=function2)
thread1.start()
thread2.start()

# 等待两个线程结束
thread1.join()
thread2.join()

print("Both threads have finished.")

plaintext
1
2
3
4
5
Function 1 holding lock 1...
Function 2 holding lock 2...
Function 1 waiting for lock 2...
Function 2 waiting for lock 1...

整个程序无法自然结束

总结

python有时可以通过多线程的灵活使用来提高程序执行效率,但也有情况是不适合的,在线程并发活跃时可以考虑添加锁,但也要权衡数据的稳定和效率。