人生苦短
我用Python

Python021-线程|多线程|线程池

Python021-线程|多线程|线程池

1.线程基础

1.1 什么是线程?

线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在Unix System VSunOS中也被称为轻量进程(lightweight processes),但轻量进程更多指内核线程(kernel thread),而把用户线程(user thread)称为线程。

线程是独立调度和分派的基本单位。线程可以操作系统内核调度的内核线程,如Win32线程;由用户进程自行调度的用户线程,如Linux平台的POSIX Thread;或者由内核与用户进程,如Windows 7的线程,进行混合调度。

同一进程中的多条线程将共享该进程中的全部系统资源,如虚拟地址空间,文件描述符和信号处理等等。但同一进程中的多个线程有各自的调用栈(call stack),自己的寄存器环境(register context),自己的线程本地存储(thread-local storage)。

一个进程可以有很多线程,每条线程并行执行不同的任务。

在多核或多CPU,或支持Hyper-threading的CPU上使用多线程程序设计的好处是显而易见,即提高了程序的执行吞吐率。在单CPU单核的计算机上,使用多线程技术,也可以把进程中负责IO处理、人机交互而常被阻塞的部分与密集计算的部分分开来执行,编写专门的workhorse线程执行密集计算,从而提高了程序的执行效率。

上面是维基百科的解释!

(⊙v⊙)嗯。。。好像还是有点不太明白!

换句话说,线程是进程中的可执行代码流函数或被封装的控制语句)的序列有序代码片段),该序列被操作系统调度,并在处理器或内核上运行。

序列,让我想起了高中时学的基因片段

下面结合线程的结构图来理解:

Python文件如下:

import  time
import threading

X = 'Siffre'
Y = '三弗'

def func1(args):
first_var = 1
print(args)

def func2(args):
first_var = 2
time.sleep(5)
print(args)

def main():
A = threading.Thread(target=func1, args=(123,))
B = threading.Thread(target=func2, args=(789,))
A.start()
B.start()
A.join()
B.join()

main()#入口函数

参考《C+++多核编程》中的原图,原理是一样的,只需要将源码替换即可。

上图中的数据段内容其实就是该进程中的全局变量

注意:

  1. 一个应用程序,可以多进程、也可以多线程。

  2. 默认情况下,一个Python脚本是单进程、单线程的。

  3. 所有的进程都有一个主线程,主线程是进程的控制流或执行线路,具有多个线程的进程是多线程的。其实,线程分为用户级线程和内核级线程。

1.2 线程特点

  • 线程可以读写它所属进程的全局声明变量
  • 进程中一个线程做出的任何改动都可以被进程中的所有线程以及主线程获得
  • 线程将大部分的资源同相同进程中的其他线程进行共享

    线程必须共享其他资源,如:处理器、内存、文件描述符等,文件描述符是单独为每个进程分配的,相同进程中的线程将竞争对这些描述符发使用权。线程可以分配额外的资源,例如文件或互斥量,但是进程中所有的线程都可以访问它们。一个进程能够消耗的资源是受限制的,以致于该进程中所有线程的全部资源不能超过该进程的限制资源

1.3 线程作用

提高并发能力,利用多核特性,增加应用程序的吞吐量(I/O)。

在Python中,由于GIL的限制,不能利用多核的特性。每核只能处理一个线程,但是在I/O方向是没问题的。

GIL全局解释器锁

对于Python有如下建议:

  • 对于I/O密集型操作,极少数占用CPU,使用多线程操作,能提高效率
  • 对于计算密集型操作,大量占用CPU,使用多进程操作,能提高效率

在知道如何使用线程之前,还有一个比较重要的概念,需要搞清楚:主线程!不过上来就介绍主线程,我觉得会有点懵逼,不如先在懵逼中探索,先了解一下线程的基本使用,在引出主线程的概念。

2. 线程使用

2.1 如何创建线程?

Python中,已经为我们提供了现成的模块:threading

threading库可用于创建、维护和管理多线程程序和应用中的线程。当创建一个多线程程序时,可以在进程的执行期间的任何时候创建线程,因为他们是动态的。

创建线程有两种方式:

  • 直接式
  • 继承式

直接式:

import  threading
import time

def func(num):
print('running on number:%s' %num)
time.sleep(3)

if __name__ == "__main__":
t1 = threading.Thread(target=func, args=(1,))
t2 = threading.Thread(target=func, args=(2,))

t1.start() #并不代表当前被立即被执行,系统来决定
t2.start()

继承式

import  threading
import time

class MyThread(threading.Thread):
def __init__(self, num):
threading.Thread.__init__(self)
self.num = num

def run(self):
print('running on number:%s' %self.num)
time.sleep(3)

if __name__ == "__main__":
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()

上述两种方式的结果是一样的!

2.2 如何调用线程?

同样线程的调用也有两种方式:

  • 直接调用
  • 自定义类调用

直接调用:

import  threading
import time

def func(num):
print('running on number:%s' %num)
time.sleep(3)

if __name__ == "__main__":
t1 = threading.Thread(target=func, args=(1,))
t2 = threading.Thread(target=func, args=(2,))

t1.run() #线程被cpu调度后自动执行线程对象的run方法
t2.run()

自定义类调用:

import  threading
import time

class MyThread(threading.Thread):
def __init__(self, num):
super(MyThread,self).__init__()
self.num = num

def run(self):
print('running on number:%s' %self.num)
time.sleep(3)

if __name__ == "__main__":
t = MyThread(1)
t.run()

后面的方式比较Pythonic

2.3 线程方法介绍

  • start()准备就绪,等待调度—-不代表当前线程不会被立即执行,而是等待CPU调度
  • setName()为线程设置名称
  • getName()获取线程名称
  • setDaemon(True) 设置为守护线程,必须在start()之前设置。True表示主线程不等待子线程,执行完自己的任务后,自动关闭,子线程有可能未执行完毕。默认情况下,主线程要等待子线程执行完毕再关闭。由于线程会无限循环,所以设置为守护线程,这样当进程结束时,线程也将被销毁。当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon()方法啦 。
  • join([timeout])阻塞当前上下文环境的线程,直到调用此方法的线程终止或达到指定的超时时间。 也就是说,如果不想让线程并发的操作,表示主线程到此等待,直到子线程完毕;如果加上参数,表示主线程最多等待多少秒数。也就是说,在子线程完成运行之前,这个子线程的父线程将一直被阻塞。该方法使得多线程变得无意义
  • run()线程被CPU调度后自动执行线程对象的run()方法
  • currentThread()返回当前的线程变量
  • enumerate()返回一个包含正在运行(指线程启动后,结束前,不包括启动前和终止后)的线程的列表。
  • activeCount()返回正在运行的线程数量
  • isAlive()返回线程是否活动的

3. 主线程

上面我们学习了一些线程的基本使用,通过一些应用来理解主线程的概念是很有必要的,在了解主线程之前,先来看看join()方法的实例:

  1. 不设置join()
import time
import threading

class MyThread(threading.Thread):
def run(self):
for i in range(5):
print('thread {}, @number: {}'.format(self.name, i))
time.sleep(1)

def main():
print("Start main threading")
# 创建三个线程
threads = [MyThread() for i in range(3)]
# 启动三个线程
for t in threads:
t.start()

print("End Main threading")


if __name__ == '__main__':
main()

结果如下:

Start main threading
thread Thread-1, @number: 0
thread Thread-2, @number: 0
thread Thread-3, @number: 0
End Main threading
#主线程结束,子线程仍在运行(下面的顺序,不同机器不同顺序)
thread Thread-3, @number: 1
thread Thread-1, @number: 1
thread Thread-2, @number: 1
thread Thread-1, @number: 2
thread Thread-3, @number: 2
thread Thread-2, @number: 2
thread Thread-3, @number: 3
thread Thread-1, @number: 3
thread Thread-2, @number: 3
thread Thread-1, @number: 4
thread Thread-3, @number: 4
thread Thread-2, @number: 4
  1. 设置join()
    这里join()的作用不是很清晰,看看下面的:
import   threading
import time

class MyThread(threading.Thread):
def run(self):
for i in range(5):
print('thread {}, @number: {}'.format(self.name, i))
time.sleep(1)

def main():
print("Start main threading")
# 创建三个线程
threads = [MyThread() for i in range(3)]
# 启动三个线程
for t in threads:
t.start()
#执行join
for t in threads:
t.join()
print("End Main threading")

if __name__ == '__main__':
main()

结果如下:

Start main threading
thread Thread-1, @number: 0
thread Thread-2, @number: 0
thread Thread-3, @number: 0
thread Thread-3, @number: 1
thread Thread-1, @number: 1
thread Thread-2, @number: 1
thread Thread-1, @number: 2
thread Thread-2, @number: 2
thread Thread-3, @number: 2
thread Thread-3, @number: 3
thread Thread-2, @number: 3
thread Thread-1, @number: 3
thread Thread-2, @number: 4
thread Thread-1, @number: 4
thread Thread-3, @number: 4
End Main threading

线程之间变得有顺序了!主要是因为子线程调用join()方法,使得主线程被阻塞住,等待全部子线程执行完毕,主线程才可以结束。

其中的过程又是怎样的呢?其实很好理解,就是字面上的意思:子线程加入join)主线程,也就是每个子线程由被后面新建的子线程给阻塞住了,以致于整个主线程也被阻塞住,最后,线程之间也就是变得有顺序了。

先来总结一下join()方法吧:

  • join()方法作用:阻塞主线程,专注执行多线程
  • 多线程多join()情况下,依次执行各线程的join()方法,使得执行变得有序,前面一个结束才能执行后面一个。
  • 无参数,则等待该线程结束,才执行下一个线程的join()

上面的过程也可以称之为线程的合并

看完上面的应该清晰一点点了。那么问题又来了,之前所说的主线程是什么鬼?使用最开始的那段代码来解释吧。

import  time
import threading

X = 'Siffre'
Y = '三弗'

def func1(args):
first_var = 1
print(args)

def func2(args):
first_var = 2
time.sleep(5)
print(args)

def main():
A = threading.Thread(target=func1, args=(123,))
B = threading.Thread(target=func2, args=(789,))
A.start()
B.start()
A.join()
B.join()

main()#主线程

该实例是简单的多线程程序有一个主线程以及线程将要执行的函数。

主线程是BOSS线程。

  1. BOSS线程声明两个线程,即ThreadAThreadB
  2. threading.Thread()创建线程并将他们同将要执行的任务关联起来,处于一种待激活状态。threading.Thread()使得在主线程的控制流中产生了分支,增加了两个并发执行的控制流,分别是ThreadAThreadB
  3. func1func2这两个任务分别被发送到标准输出。
  4. threading.Thread()对象通过调用start()方法,启动线程。
  5. join()方法使得,主线程等待,直到两个线程都返回。

如下图:

至此,对线程、主线程、子线程的概念及关系以及相关方法的有了一定的了解,下面就来进一步深入学习。

4. 线程类型

经过上面的一些了解,我们发现线程有几种类型:

  • 主线程
    当一个程序启动时,就有一个进程被操作系统创建,与此同时一个线程也立刻运行,该线程通常叫做程序的主线程。每个进程至少都有一个主线程,主线程通常最后关闭。

  • 子线程
    在程序中创建的其他线程,相对于主线程来说就是子线程。

  • 守护线程
    线程的一种标识,守护线程为其他线程提供服务,如垃圾回收线程。当剩下的全部是守护线程时,进程退出。

  • 前台线程
    相对于守护线程的其他线程为前台线程。

这里也算是对前面的一些小结吧!

5. 线程状态

线程是当进程被调度执行时的执行单元。如果进程中只有一个线程,该线程是指派到处理器内核的主线程。如果有多个线程,而且对于该进程有多个处理器可用,那么所有的线程都会被指派到处理器上。(当然在Python中就别指望线程多核利用了,可以考虑多进程!)

当线程被调度到处理器内核上执行时,它会改变自身的状态。线程的状态是指在任意指定时间所处的模式或情形。

当我们创建线程后,线程并不是始终保持一个状态。其状态如下:

  • New创建
  • Runable就绪,等待调度
  • Running运行
  • Blocked阻塞,阻塞可能在wait,locked,sleeping
  • Dead消亡

图片引内心求法博客

当线程遇到阻塞时,可能会有以下三种情况:

  • 同步
    线程中获取同步锁,但是资源已经被其他线程锁定时,进入Locked状态,直到该资源可获取(获取顺序由Lock队列控制)

  • 睡眠
    线程运行sleep()join()方法后,线程进入Sleeping状态。区别在于sleep等待固定的时间,而join是等待子线程执行完。当然join可以指定超时时间。

  • 等待
    线程中执行wait()方法后,线程进入Waiting状态,等待其他线程的通知notify

主线程可以决定整个进程的状态。如果主线程是唯一的线程,则主线程的状态通进程状态相同。如果主线程在休眠,进程也在休眠。如果主线程在运行,进程也在运行。对于有多个线程的进程,只有进程中所有的线程都处于休眠或停止状态时,我们才能够认为整个进程休眠或停止。如果有一个线程是活动的(可运行或运行),那么进程就是活动的。

6. 守护线程

前面,我们用join()做引子来认识主线程,接下来同样用一个引子认识一个概念。

看看setDaemon()的示例:

import threading
import random
import time

class MyThread(threading.Thread):
def run(self):
wait_time = random.randrange(1, 10)
print("%s will wait %d seconds" %(self.name, wait_time))
time.sleep(wait_time)
print("%s finished!" % self.name)

if __name__ == "__main__":
print('main thread is waitting for exit...')

for i in range(5):
t = MyThread()
t.setDaemon(True)
t.start()

print('main thread finished!')

执行结果:

main thread is waitting for exit...
Thread-1 will wait 4 seconds
Thread-2 will wait 8 seconds
Thread-3 will wait 6 seconds
Thread-4 will wait 4 seconds
Thread-5 will wait 2 seconds
main thread finished!

可以看出,主线没有等待子线程的执行,而直接退出!

默认情况下,主线程在退出时会等待所有子线程的结束。如果希望主线程不等待子线程,而是在退出时自动结束所有的子线程,就需要设置子线程为后台线程(daemon)即守护线程。可以通过setDaemon()方法设置。

7.线程锁

通过对线程的学习,我们知道线程是共享同一进程中的内存地址,同时线程之间并没有任何顺序可言,进行随机调度,肯定会出现抢占资源的情况。这样会引发一些隐患,当多个线程同时修改一条数据(全局变量)时,可能会出现脏数据,导致数据结果与实际不一致。

为解决这个问题,我们需要给线程加把锁,在同一时刻,工友数据(全局变量)只允许指定的线程执行操作。

在Python中,为我们提供了两种锁:LockRLock

  • Lock一般的锁(单层加锁)
  • RLock递归锁(多层加锁)

在加锁这里,Python2.x和Python3.x有很大的差距的,Python3.x做了很大的优化,基本上是没有数据的错乱,但还是要加锁的!

Lock单层锁

我们需要在Python2.x看不加锁的结果:

import time
import threading

def addNum():
global num # 在每个线程中都获取这个全局变量
print('--get num:', num)
time.sleep(1)
num -= 1 # 对此公共变量进行-1操作

num = 100 # 设定一个共享变量
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)

for t in thread_list: # 等待所有线程执行完毕
t.join()
print('final num:', num)

正常情况,这个结果应该是0,但是每次运行的结果都不相同。原因何在?What The Fuck,因为多个线程同时对一个变量进行更改,致使数据错误!为了避免这种情况我们需要加锁,来限制访问的线程。

我们通常会使用Lock(单层锁):

import time
import threading

def addNum():
global num #在每个线程中都获取这个全局变量
print('--get num:',num )
time.sleep(1)
lock.acquire() #修改数据前加锁
num -=1 #对此公共变量进行-1操作
lock.release() #修改后释放

num = 100 #设定一个共享变量
thread_list = []
lock = threading.Lock() #生成全局锁
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)

for t in thread_list: #等待所有线程执行完毕
t.join()
print('final num:', num )

这样的结果,就都是0啦!

当我们使用Pytohn3.x时,不加锁和加锁的结果是一样的,不过默认情况下,Py3.x并没有加锁,我们还是有加锁的必要的!

每次都要手动加锁和解锁,好蛋疼a,时不时的忘记解锁就导致死锁问题!不过,Python大法就是好,提供了with自动加锁和解锁!

import  threading

class SharedCounter:

def __init__(self, init_value=0):
self.init_value = init_value
self._value_lock = threading.Lock()

def incr(self, delta=1):
with self._value_lock:
self.init_value += delta

def decr(self, delta=1):
with self._value_lock:
self.init_value -= delta

这是官网上的例子。

有了它,加锁时,我们就可以解放双手~(≧▽≦)/~啦啦啦!

官方大法:Lock 对象和 with 语句块一起使用可以保证互斥执行,就是每次只有一个线程可以执行 with 语句包含的代码块。with 语句会在这个代码块执行前自动获取锁,在执行结束后自动释放锁。

到这里,有朋友可能会问,互斥锁呢?我只能告诉你上面的就是互斥锁!看下图就明白了:

看完图,就来解释一下,两个概念吧:

  • 互斥锁同步
    多线程编程的最常见问题:数据共享。当多个线程都修改某一个共享数据的时候,需要进行同步控制。
    线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。互斥锁为资源引入一个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

    这就是流传已久的互斥锁同步解释!其实就是我们上面实现的锁机制。

  • 同步阻塞
    当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“同步阻塞”
    直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。

继续探讨:

线程本质上是不确定。因此,在多线程程序中错误的使用锁机制可能回导致随机数据损坏或者其他异常行为,我们称之为竞争条件。为了避免竞争条件,最好在临界区(对资源进行操作的那部分代码)使用锁。在一些“老的”Python代码中,显示的获取和释放锁是很常见的。如下:

import threading

class SharedCounter:

def __init__(self, initial_value = 0):
self._value = initial_value
self._value_lock = threading.Lock()

def incr(self,delta=1):
self._value_lock.acquire()
self._value += delta
self._value_lock.release()

def decr(self,delta=1):
self._value_lock.acquire()
self._value -= delta
self._value_lock.release()

相对这种显示调用的方法,with语句更加优雅,也不容易出错,特别是“我们这类人”可能回忘记调用release()方法或者程序在获得锁之后产生异常这两种情况(使用with可以保证在这两种情况下仍能正确释放锁)。

为了避免出现死锁的情况,使用锁机制的程序应该设定为每个线程一次只允许获得一个锁。如果不这样做的话,我们就需要更高级的锁机制来避免死锁机制–RLock递归锁

RLock递归锁

在接触RLock递归锁之前,我们先要知道死锁是怎么造成的?

在多线程程序中,死锁问题很大一部分是由于线程同时获取多个锁造成的。

在线程间共享多个资源时,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁现象。尽管死锁很少发生,但时一旦出现就会造成应用的停止响应—进程死掉。

举个例子:一个子线程获取了第一个锁,然后在获取第二个锁的时候发生阻塞,那么这个线程就可能阻塞其他线程的执行,从而导致整个程序假死。

import  threading
import time
class MyThread(threading.Thread):
def task1(self):
global resA, resB
if mutexA.acquire():
msg = self.name + ' got resA'
print(msg)
time.sleep(1)
if mutexB.acquire():
msg = self.name + ' got resB'
print(msg)
mutexB.release()
mutexA.release()

def task2(self):
global resA, resB
if mutexB.acquire():
msg = self.name + ' got resB'
print(msg)
time.sleep(1)
if mutexA.acquire():
msg = self.name + ' got resA'
print(msg)
mutexA.release()
mutexB.release()

def run(self):
self.task1()
self.task2()

resA = 0
resB = 0

mutexA = threading.Lock()
mutexB = threading.Lock()

def main():
for i in range(2):
t = MyThread()
t.start()

if __name__ == '__main__':
main()

上面的实例中,每个线程都需要执行两个任务,两个任务都需要获取锁,然而两个任务先得到锁后,就需要等另外锁释放,有点描述不太清楚,还是来看图吧:

也就是说,线程1的锁a还没释放,同时线程2的锁a也没有释放,都在等待对方解锁,等着、等着整个进程就死掉了。。。

为了解决多把锁导致的死锁问题,我们需要递归锁(可重入锁)

当我们使用递归锁时,只有一个线程可以使用完整的函数或者类中的方法,如下:

import threading

class SharedCounter:
_lock = threading.RLock()
def __init__(self, initial_value = 0):
self._value = initial_value

def incr(self,delta=1):
with SharedCounter._lock:
self._value += delta

def decr(self,delta=1):
with SharedCounter._lock:
self.incr(-delta)

这个例子中,没有对每一个实例中的可变对象加锁,取而代之的是一个被所有实例共享的类级锁。这个锁用来同步类方法,具体说就是,这个锁可以保证一次只有一个线程可以调用这个类的方法。不过,与一个标准的锁不同的是,已经持有这个锁的方法在调用同样适用这个锁的方法时,无需再次获取锁。也就是说,只有获得资源的线程所有的锁都被释放后,其他的线程才能获得资源。

如下图所示:

对于加锁的使用,官方有如下建议:

官方建议:RLock 对象,根据以往经验,这些是用于一些特殊的情况,如果你只是需要简单地对可变对象进行锁定,那就不应该使用它们。

那该如何确定使用哪个锁机制呢?只能根据实际的需求了,能简单就不要复杂化!

推荐使用递归锁,尽可能避免死锁的发生!强烈推荐使用with大法!

8.条件变量

我们使用前面的锁,可以实现实现线程同步。当更复杂的环境时,我们就需要针对锁进行一些条件判断。Python提供了Condition对象,它除了具有acqurie()release()方法之外,还提供了wait()notify()方法。

可以认为Condition对象维护了一个锁和一个等待池(waiting)。

线程通过Condition对象调用内部acquire属性获得锁,调用wait()方法时,线程会释放Condition内部的锁并进入blocked状态,同时waiting池中记录这个线程。当调用notify()方法时,Conditon对象会从waiting等待池中随机挑选一个线程,通知其调用acquire属性来获取锁。

这里若是不明白,可以读读源码!

Condition对象的构造函数可以接受一个锁Lock/RLock对象作为参数,默认情况,Condition对象会在内部自行创建一个RLock

class Condition:

def __init__(self, lock=None):
if lock is None:
lock = RLock() #内部创建锁
self._lock = lock
# Export the lock's acquire() and release() methods
self.acquire = lock.acquire
self.release = lock.release
...

Condition对象还提供了notifyAll()方法,可以通知waiting等待池中的所有线程去获取acquire属性对应的锁。此法目的是防止有线程永远处于沉默状态。

演示条件变量同步的经典实例:生产者与消费者模型。
假设有一群生产者和一群消费者,通过一个市场来交互产品。生产者“策略”是如果市场上剩余的产品少于1000个,那么就生产100个产品放到市场上,而消费者的“策略”是如果市场上产品的数量多于100个,那么就消费3个产品。

import threading
import time

class Producer(threading.Thread):
def run(self):
global count
while True:
with con._lock:
if count > 1000:
con.wait()
else:
count = count+100
msg = self.name+' produce 100, count=' + str(count)
print(msg)
con.notify()
time.sleep(1)

class Consumer(threading.Thread):
def run(self):
global count
while True:
with con._lock:
if count < 100:
con.wait()
else:
count = count-3
msg = self.name+' consume 3, count='+str(count)
print(msg)
con.notify()
time.sleep(1)

count = 500
con = threading.Condition()

def main():
for i in range(2):
p = Producer()
p.start()
for i in range(10):
c = Consumer()
c.start()
if __name__ == '__main__':
main()

9. 队列

前面我们学习了条件变量解决了线程间的单一数据同步的问题。

当我们考虑更复杂的场景时,单单一条数据同步是远远不够的,此时,我们需要更加高级的方式来解决这个问题。

Python的Queue模块中提供了同步的、线程安全的队列queue类,包括:

  • FIFO先入先出队列
  • LIFO后入先出队列
  • Priority优先级队列

嘿嘿!队列详细内容请看另一篇文章:对列

既然解决了复杂环境的数据同步问题,接下来就该探讨一下通信问题吧!

10. 线程通信

线程通信可能会有点混淆,说说和前面的条件变量的关系吧,线程通信:

  • 条件变量—Condition条件
  • 条件同步—Event事件

条件变量和条件同步(Event事件)意思差不多,只是多了锁功能,那是因为二者的作用目标不同:

条件变量(Condition:设计于访问共同资源的条件环境
条件同步(Event:设计于不访问共享资源的条件环境

在前面我们已经接触到了线程通信的条件变量(Condition)部分,这里就只学习条件同步(Event事件)喽!

那就先来看看它有什么方法吧:

#条件环境对象,初始值为False
event = threading.Event()
event.isSet() #返回event的状态值
event.wait() #如果event.isSet()==False将阻塞线程
event.set() #设置event的状态值为True,所有等待池的线程激活进入就绪状态,等待操作系统调度
event.clear() #恢复event的状态值为False

通过条件同步(Event事件)来实现两个或多个线程之间的交互,下面是一个红绿灯的例子:启动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按照红灯停,绿灯行的规则。

import threading,time
import random
def light():
if not event.isSet():
event.set() #wait就不阻塞 #绿灯状态
count = 0
while True:
if count < 10:
print('\033[42;1m--green light on---\033[0m')
elif count <13:
print('\033[43;1m--yellow light on---\033[0m')
elif count <20:
if event.isSet():
event.clear()
print('\033[41;1m--red light on---\033[0m')
else:
count = 0
event.set() #打开绿灯
time.sleep(1)
count +=1
def car(n):
while 1:
time.sleep(random.randrange(10))
if event.isSet(): #绿灯
print("car [%s] is running.." % n)
else:
print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
event = threading.Event()
Light = threading.Thread(target=light)
Light.start()
for i in range(3):
t = threading.Thread(target=car,args=(i,))
t.start()

再来看一个例子:员工进公司要刷卡,设置一个线程是门,在设置几个线程为员工,员工看到门没有打开,就刷卡,刷完卡,门开了,员工通过。

import threading
import time
import random

def door():
door_open_time_counter = 0
while True:
if door_swiping_event.is_set():
print("\033[32;1mdoor opening....\033[0m")
door_open_time_counter +=1

else:
print("\033[31;1mdoor closed...., swipe to open.\033[0m")
door_open_time_counter = 0 #清空计时器
door_swiping_event.wait()


if door_open_time_counter > 3:#门开了已经3s了,该关了
door_swiping_event.clear()

time.sleep(0.5)

def staff(n):

print("staff [%s] is comming..." % n )
while True:
if door_swiping_event.is_set():
print("\033[34;1mdoor is opened, passing.....\033[0m")
break
else:
print("staff [%s] sees door got closed, swipping the card....." % n)
print(door_swiping_event.set())
door_swiping_event.set()
print("after set ",door_swiping_event.set())
time.sleep(0.5)
door_swiping_event = threading.Event() #设置事件

door_thread = threading.Thread(target=door)
door_thread.start()

for i in range(5):
p = threading.Thread(target=staff,args=(i,))
time.sleep(random.randrange(3))
p.start()

可以看出基本的使用流程就是这样的:

11. 定时器

定时器,延迟多长时间(单位:秒)执行,Timer()是Thread的派生类,用于在指定时间后调用一个方法。

import threading

def hello():
print('hello,world!!')

t=threading.Timer(1,hello)
t.start()

没啥可介绍的,知道怎么用就行!

到这里基本的线程知识也快接近尾声了,是不是有种想吐的感觉。。。,兄弟们,坚持住,接下来才是重头戏!

12. 线程池

什么是线程池?

我们知道Web服务器,数据库服务器,文件服务器和邮件服务器等许多服务器应用都面向处理来自某些远程来源的大量短小的任务。

每当一个请求到达服务器时就会创建一个新的服务对象,然后在新的服务对象中进行处理。但是,当有大量请求并发访问时,服务器不断的创建和销毁对象的开销很大。为了提高服务器的效率,尽可能的减少创建和销毁对象的次数,特别是一些很好资源的对象创建和销毁。于是乎,引入了的概念:使得人们可以定制一定量的资源,然后对这些资源进行复用,而不是频繁的创建和销毁。

线程实现思路:

  1. 通过队列(先进先出队列,队列都是在内存中操作,进程退出,队列清空)来实现,线程池中没有线程时为阻塞状态。
  2. 自定义一个线程池类,构造方法时,创建一个指定元素数量的队列,队列中的元素为线程类
  3. 使用线程时,队列的get()方法得到一个线程类,使用__call__方法创建一个线程,使用线程执行指定的程序
  4. 程序执行完成后,在队列中添加一个新的线程类

简单的线程池模型:

实现代码:

import threading,time,queue

class ThreadPool:
def __init__(self,maxsize):
self.maxsize=maxsize
self._q=queue.Queue(maxsize)
for i in range(maxsize):
self._q.put(threading.Thread)

def get_thread(self):
return self._q.get()
def add_thread(self):
self._q.put(threading.Thread)

pool=ThreadPool(5)

def task(arg,p):
print(arg)
time.sleep(1)
p.add_thread()

for i in range(100):
t = pool.get_thread() #线程池中没有线程为阻塞状态
obj=t(target=task,args=(i,pool))
obj.start()

上述方式的缺点:没有将线程重复利用,要知道创建一个线程的耗时可能是一个线程执行的耗时好几倍。因此,我们需要使用另一种方式。

同样是使用队列,但队列中的元素为一个个(函数名,函数参数)的元祖,创建一个线程组成的列表,线程轮流去队列中取到元祖,分解后执行函数,然后取下一个函数。

import queue
import threading
import contextlib #上下文管理
import time

StopEvent = object()

class ThreadPool(object):

def __init__(self, max_num, max_task_num = None):
if max_task_num:
self.q = queue.Queue(max_task_num) #创建队列,设置最大数
else:
self.q = queue.Queue() #创建队列,不设置
self.max_num = max_num
self.cancel = False
self.terminal = False
self.generate_list = []
self.free_list = []

def run(self, func, args, callback=None):
"""
线程池执行一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""
if self.cancel:
return
#空闲的线程为零,并且生成的线程数小于最大数值
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
w = (func, args, callback,)
self.q.put(w)

def generate_thread(self):
"""
创建一个线程
"""
t = threading.Thread(target=self.call)
t.start()

def call(self):
"""
循环去获取任务函数并执行任务函数
"""
#当前线程添加到列表
current_thread = threading.currentThread
self.generate_list.append(current_thread)

event = self.q.get()
while event != StopEvent:

func, arguments, callback = event
try:
result = func(*arguments)
success = True
except Exception as e:
success = False
result = None

if callback is not None:
try:
callback(success, result)
except Exception as e:
pass

with self.worker_state(self.free_list, current_thread):
if self.terminal:
event = StopEvent
else:
event = self.q.get()
else:

self.generate_list.remove(current_thread)

def close(self):
"""
执行完所有的任务后,所有线程停止
"""
self.cancel = True
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent)
full_size -= 1

def terminate(self):
"""
无论是否还有任务,终止线程
"""
self.terminal = True

while self.generate_list:
self.q.put(StopEvent)

self.q.empty()

@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于记录线程中正在等待的线程数
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)

# How to use

pool = ThreadPool(5)

def callback(status, result):
# status, execute action status
# result, execute action return value
pass

def action(i):
print(i)

for i in range(10):
ret = pool.run(action, (i,), callback)

time.sleep(3)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
pool.close()
pool.terminate()

线程池注意事项:

即使线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。在使用线程池需要注意线程池的大小,注意并发风险、死锁、资源不足和线程泄露等问题。

  1. 线程池大小。多线程应用并非是越多越好,需要根据系统运行的软硬件环境以及应用本身的特点决定线程池的大小。一般来说,如果代码结构合理,线程数据与CPU数理相适应即可。 如果线程运行时出现阻塞现象,可相应的增加池的大小;若有必要可采用自适应算法来动态调整线程池的大小,以提高CPU的有效利用率和系统的整体性能。
  2. 并发错误。多线程应用要特别注意并发错误,要从逻辑上保证程序的正确性,注意避免死锁现象的发生。
  3. 线程泄漏。这是比较严重且麻烦的问题,当任务执行完毕而线程没有返回池中就会发生线程池泄漏。

补充:

上下文管理器,待总结。。。

仅供学习参考:懒执事 » Python021-线程|多线程|线程池

分享到:更多 ()