返回目录

 

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

本篇索引

(1)线程基本概念

(2)threading模块

(3)线程间同步原语资源

(4)queue

 

  (1)线程基本概念

当应用程序需要并发执行多个任务时,可以使用线程。多个线程(thread)同时运行在一个进程(process)的内部, 它们可以共享访问本进程内的全局变量数据和资源。各个线程之间的调度由操作系统负责, 具体做法是:给每个线程分配一个小的时间片,并在所有的线程之间循环切换。在具有多核的CPU上, 操作系统有时会安排尽可能使用每个CPU,从而并行执行线程。

并发编程的复杂性在于,多个线程可能同时更新一个数据,导致数据的损坏或不一致(术语叫做:竞争), 要解决这个问题,必须使用互斥锁或其他类似的同步手段保护这些数据。

Python解释器使用了内部的GIL(Global Interpreter Lock,全局解释器锁), 这限制了Python程序只能在一个处理器上运行。如果一个应用程序的大部分是I/O密集型的, 那么使用线程是没有问题的。而如果是CPU密集型的,那使用多个线程没任何好处,还会降低运行速度。 因此对于计算密集型的任务,最好使用C扩展模块或multiprocessing模块来代替。 C扩展具有释放解释器锁和并行运行的选项,前提是释放锁时不与解释器进行交互。 multiprocessing模块将工作分派给不受锁限制的单独子进程。

使用多线程编程还要注意一个问题:开的线程的数量级不能太大。例如,一台使用线程的网络服务器对于100个线程工作情况良好, 但如果增加到10000个线程,性能就会变得非常糟糕。因为每个线程都需要有自己的系统资源, 而且还会产生线程上下文切换、锁和其他相关开销,算下来是个不小的开销。 对这种I/O密集型应用,比较常见的做法是将程序编写为:异步事件处理机制的结构,比如“协程”。 使用异步和协程的方式编程,可以比较轻松地处理诸如10000个连接的情况。

没有任何方法可以强制终止或挂起其他线程!这是设计上的原因。因此,线程只能自己挂起或自己终止。

 

 

 

  (2)threading模块

threading模块提供Thread类和各种同步原语,用于编写多线程的程序。 threading模块可创建:Thread对象、Timer对象、 Lock对象、RLock对象等。

 

● Thread对象

Thread类用于表示单独的控制线程,创建新线程的语法如下:

Thread(group=None, target=None, name=None, args=(), kwargs={})

此函数创建一个新的线程实例,target是一个可调用对象(线程启动时,run()方法将调用此对象), name是线程名称,默认将创建一个名为 “Target-N” 格式的唯一名称。 argskwargs是传递给target函数的参数元组、参数字典。

 

Thread实例支持以下属性和方法

属性或方法 类型 说明
name 属性 线程名称,这个字符串用于唯一标识线程。
ident 属性 整数线程标识符,如果线程尚未启动,它的值为None。
daemon 属性 布尔值,True表示为后台线程。它的初始值从创建线程的线程继承而来。 主线程(控制线程)不是后台线程(主线程的daemon为 False)。 通常 Python 解释器退出之前,会等待所有线程终止。但不会等待daemon为 True的线程。 当主线程结束后,如果其他线程的的daemon都为True,Python解释器将不等待那些线程, 整个Python程序将即时退出。
t.start() 方法 在主线程中,调用此方法启动线程。
t.run() 方法 线程启动时,将自动调用此方法。它将调用先前创建线程对象时,由target指定的目标函数。 可以在Thread的子类中,重新定义此方法。
t.join([timeout]) 方法 在主线程中调用此方法,功能是等待直到线程实例 t 终止或超时为止。timeout是一个浮点数, 单位为“秒”。在线程启动之前不能调用此函数,否则会报错。
t.is_alive() 方法 如果线程是活动的,返回True,否则返回False。从start()方法返回的那一刻开始, 线程就是活动的,直到它的run()方法终止为止。

 

● 线程使用实例

通常 Python 解释器退出之前,会等待所有线程终止。但是,若将创建的线程的daemon设置为 True, 会使解释器在主线程结束后立即退出程序,这些daemon为 True的线程将即时被销毁。

下例中,线程 t 每5秒从后台激活激活运行一次;30秒后,主线程结束,整个Python程序退出:

import threading
import time

def clock(interval):
    while True:
        print("The time is %s" % time.ctime())
        time.sleep(interval)
        
t = threading.Thread(target=clock, args=(5,))
t.daemon = True
t.start()

time.sleep(30)

下例为将同一个线程定义为一个 Thread 的子类:

import threading
import time

class ClockThread(threading.Thread):
    def __init__(self, interval):
        threading.Thread.__init__(self)
        self.daemon = True
        self.interval = interval
    def run(self):
        while True:
            print("The time is %s" % time.ctime())
            time.sleep(self.interval)          

t = ClockThread(5)
t.start()

time.sleep(30)  

 

 

● Timer对象

Timer对象用于在稍后某个时间执行一个函数,调用语法如下:

Timer(interval, func [,args [,kwargs]])

此函数创建定时器对象,在interval秒之后运行函数funcargskwargs是传递给func函数的参数元组、参数字典。

 

Timer实例支持以下方法:

属性或方法 类型 说明
t.start() 方法 启动定时器,func函数将在指定间隔时间后执行。
t.cancel() 方法 如果函数尚未执行,取消定时器。

 

 

● 实用工具函数

函数 说明
active_count() 返回当前活动的Thread对象数量。
current_thread() 返回调用者的线程对象。
enumerate() 列出当前所有活动的Thread对象。
local() 返回local对象,用于保存线程本地的数据。应该保证此对象在每个线程中是唯一的。
setprofile(func) 设置一个配置文件函数,用于已创建的所有线程。 func在每个线程开始运行之前被传递给 sys.setprofile()函数。
settrace(func) 设置一个跟踪函数,用于已创建的所有线程。 func在每个线程开始运行前被传递给 sys.settrace()函数。
stack_size([size]) 返回创建新线程时使用的栈大小。可选整数参数size表示创建新线程时使用的栈大小。 size的值可以是 32768(32 KB) 或更大,而且是 4096 的倍数。如果系统上不支持此操作, 将引发 ThreadError 异常。

 

 

 

  (3)线程间同步原语资源

● 锁(Lock)

锁(或称为“互斥锁”)有2个状态:已锁定、未锁定。如果锁处于已锁定状态,尝试获取锁的线程将被阻塞, 直到锁被释放为止。如果有多个线程等待获取锁,当锁被释放时,只有一个线程能获得它,具体哪个线程随机。 创建锁的语法如下,初始状态为“未锁定”:

threading.Lock()

 

Lock实例支持以下方法

实例方法 说明
lock.acquire([blocking]) 获取锁,成功则返回 True。blocking参数默认为 True,当锁为已锁定时,则阻塞本线程。 若blocking设为 False,当无法获取锁时,将立即返回 False,不阻塞。
lock.realease() 释放一个锁,当锁处于“未锁定”状态时、或从与原本调用acquire()方法的线程不同的线程调用此方法, 将出现错误。

 

 

● 可重入锁(RLock)

可重入锁(RLock)类似于Lock对象,但同一个线程可以多次获取它。 这允许拥有锁的线程执行嵌套的acquire()release()操作。 在这种情况下,只有最外层的release()操作才能将锁重置为“未锁定”状态。

创建可重入锁的语法如下:

threading.RLock()

 

RLock实例支持以下方法

实例方法 说明
rlock.acquire([blocking]) 获取锁,成功获取则返回 True,而且递归级别被设置为1。如果此线程已拥有锁, 则锁的递归级别加1,而且立即返回。blocking的含义同上。
rlock.realease() 通过减少锁的递归级别来释放它。如果在减值后递归基本为0,锁将被置位“未锁定”状态。 否则,所继续保持“已锁定”状态。只能由目前拥有锁的线程来调用此函数

 

 

● 信号量(Semaphore)

信号量是一个基于计数器的同步原语,每次调用acquire()方法时此计数器减1, 每次调用release()方法时此计数器加1。如果计数器为0,acquire()方法将会阻塞。 直到其他线程调用release()方法为止。

创建信号量对象的语法如下(value是计数器初始值,默认为1):

threading.Semaphore([value])

以下创建有边界的信号量对象(BoundedSemaphore),区别是release()操作的次数不能超过acquire()次数:

threading.BoundedSemaphore([value])

 

Semaphore实例支持以下方法

实例方法 说明
s.acquire([blocking]) 获取信号量。若计数值大于0,则减1,然后立即返回。 若计数值为0,此方法将阻塞,直到另一个线程调用release()方法为止。 blocking的含义同上。
s.realease() 通过将内部计数器的值加1来释放一个信号量。如果计数值为0,而且另一线程在等待, 则该线程将被唤醒。

 

“信号量”和“互斥锁”之间的差别在于,信号量可用于发信号。例如:可以从不同线程调用 acquire()release()方法,以便在生产者和消费者线程之间进行通信。 也可以用后面的“条件变量”来达成。

 

 

● 事件(Event)

“事件”用于线程间通信。一个线程发出事件,一个或多个其他线程等待它。 Event实例内部管理着一个标志,可以用set()方法将它设为True, clear()方法设为False,wait()方法将阻塞线程,知道标志为True。

创建事件的语法如下:

threading.Event()

 

Event实例支持以下方法

实例方法 说明
e.is_set() 只有当内部标志为True时才返回True。
e.set() 将内部标志置为True。等待它变为True的所有线程都将被唤醒。
e.clear() 将内部标志置为False。
e.wait([timeout]) 阻塞直到内部标志为True。如果进入时内部标志为True,此方法立即返回。 timeout是一个浮点数,单位为秒,指定超时期限。

“事件”不适合用于生产者-消费者问题,因为事件只有True和False两种状态, 处理过程中信号可能丢失。最好使用“条件变量”。

 

 

● 条件变量(Condition)

“条件变量”是另一种同步原语,典型用于生产者-消费者问题。

创建条件变量的语法如下,lock是可选的Lock或RLock实例,若缺省则创建新的RLock实例

threading.Condition([lock])

 

Condition实例支持以下方法

实例方法 说明
cv.acquire(*args) 获取底层锁。此方法将调用底层锁上的acquire(*args)方法。
cv.realease() 释放底层锁。此方法将调用底层锁上对应的release()方法。
cv.wait([timeout]) 等待直到获得通知或出现超时为止。此方法在调用线程已经获取锁之后调用。 调用时,将释放底层锁,而且线程将进入后随眠状态,直到另一个线程在条件变量上执行 notify()notifyAll()将其唤醒为止。在线程被唤醒之后, 线程将重新获取锁,方法也会返回。 timeout是一个浮点数,单位为秒,指定超时期限。
cv.notify([n]) 唤醒一个或多个等待此条件变量的线程。此方法只在调用线程已获取条件变量内部锁之后调用。 如果没有正在等待的线程,它就什么也不做。n指定要唤醒的线程数量,默认为1。
cv.notify_all() 唤醒所有等待此条件的线程。

 

下面为使用条件变量的模板:

import threading

cv = threading.Condition()

def producer():
    while True:
        cv.acquire()
        produce_item()
        cv.notify()
        cv.release()
        
def consumer():
    while True:
        cv.acquire()
        while not item_is_available():
            cv.wait()   # 等待直到有项出现
        cv.release()
        consume_item()

如果存在多个线程等待同一个条件,notify()操作可能唤醒他们中的一个或多个。 因此某个线程被唤醒后,可能发现它等待的条件不存在了,所以在consumer()函数中使用while循环, 如果线程醒来,但是生成的项已经消失,它就会回去等待下一个信号。

 

 

● 使用线程间资源的注意点

使用以上Lock等之类的线程间资源时,必须非常小心,依赖锁的代码应保证出现异常时正确地释放锁 否则可能导致死锁,典型的代码如下所示:

try:
    lock.acquire()
    # 关键部分
    ......
finally:
    lock.release()

使用上下文管理协议(with),更加简洁:

with lock:
    # 关键部分
    ......

另外,编写代码时,一般应该避免同时获取多个锁。

 

 

 

  (4)queue

queue模块实现了各种“多生产者-多消费者队列”,可用于在执行的多个线程间安全地交换信息。 一般来说,线程间通信最佳的方式就是使用queue,者比前面的诸如“锁”之类的资源都要好用。

queue模块定义了以下3种不同的队列类型,创建语法与说明如下:

创建函数 说明
Queue([maxsize]) 创建一个FIFO(先进先出)队列。maxsize是队列中可放入项的最大数量, 缺省或置0则队列大小为无穷大。
LifoQueue([maxsize]) 创建一个LIFO(后进先出)队列。(即:堆栈)
PriorityQueue([maxsize]) 创建一个优先级队列,使用这种队列时,项应该是(priority, data)形式的元组, 其中priority是一个数字,数字越大优先级越高。

 

队列支持以下实例方法:

实例方法 说明
q.qsize() 返回队列的大小,因为其他线程可能正在更新队列,此方法返回的数字可能不可靠。
q.empty() 如果队列为空,则返回True,否则返回False。
q.full() 如果队列为满,则返回True,否则返回False。
q.put(item [,block [,timeout]]) item放入队列,如果block为True(默认), 则调用者将阻塞直到队列中出现可用的空闲位置为止;如block为False, 队列满时将引发Full异常。timeout提供可选的超时值,单位为秒, 如果超时则引发Full异常。
q.put_nowait(item) 等价于q.put(item, False)方法。
q.get([block [,timeout]]) 从队列中取出一项返回。如果block为True(默认), 则调用者将阻塞直到队列中出现可取出的项为止;如block为False, 队列空时将引发Full异常。timeout提供可选的超时值,单位为秒, 如果超时则引发Full异常。
q.get_nowait() 等价于q.get(False)方法。
q.task_done() 队列的消费者用来指示对于项的处理已结束。如果使用此方法, 那么从队列中每取出一项都应该手动调用一次。 此方法主要是辅助q.join()方法用的。
q.join() 阻塞直到队列中所有的项均被取出和处理完为止。当为队列中的每一项都调用过了一次 q.task_done()方法,此方法将会直接返回。

 

 

● 使用队列的示例

注意下例中q.task_done()q.join()的用法, 它们将使得线程在处理完所有项后关闭。

import threading
from queue import Queue

class WorkerThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)
        self.input_queue = Queue()
        
    def send(self, item):
        self.input_queue.put(item)
        
    def close(self):
        self.input_queue.put(None)
        self.input_queue.join()
        
    def run(self):
        while True:
            item = self.input_queue.get()
            if item is None:
                break
            # 处理项
            print(item)
            self.input_queue.task_done()        
        # 完成
        self.input_queue.task_done()
        return
        
        
# 使用示例
w = WorkerThread()
w.start()
w.send('hello')
w.send('world')
w.close()

 

 

 

 

返回目录

 

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄