引言:

线程之间经常需要协同工作,通过某种技术,让一个线程访问某些数据时,其它线程不能访问这些数据,直到该线程完成对数据的操作。这些技术包括临界区(Critical Section),互斥量(Mutex),信号量(Semaphore),事件Event等。


Event

threading库中的event对象通过使用内部一个flag标记,通过flag的True或者False的变化来进行操作。

名称 含义set( )标记设置为Trueclear( )标记设置为Falseis_set( )标记是否为Truewait(timeout=None)设置等待标记为True的时长,None为无限等待。等到返回True,等不到返回False

fromthreadingimportThread,Eventimporttimedefcreditor(event:Event):print("什么时候还我钱")event.wait()print("我已经等了很长时间了")defdebtor(event:Event,count=10):print("可以宽裕几天吗?")money=[]whileTrue:print("先还你100")time.sleep(0.5)money.append(1)iflen(money)>count:event.set()breakprint("我已经还完你的钱了")event=Event()c=Thread(target=creditor,args=(event,))d=Thread(target=debtor,args=(event,))c.start()d.start()

运行结果如下所示:

可以看到creditor函数中因为event.wait( )线程进入等待状态,此时debtor线程进入运行,当满足条件时event.set( )将标记设置为True,creditor线程开始运行。谁wait就是等到flag变为True,或等到超时变为False。不限制等待的个数。

wait的使用

fromthreadingimportEvent,ThreaddefWait(event:Event,interval):whilenotevent.wait(interval):print("waitingforyou")e=Event()Thread(target=Wait,args=(e,3)).start()e.wait(10)e.set()print("mainexit")

主线程一开始就wait 10s,Waiting线程等待3s返回False,进入循环打印"waiting for you",重复3次,然后主线程set了,这时候Waiting线程变为True,不再进入循环。


Lock

凡是存在资源争用的地方都可以使用锁,从而保证只有一个使用者可以完全使用这个资源

现在要生产10个杯子,由10个工人开始生产

importthreadingimporttimecups=[]defworker(count=10):print("我是{},我开始生产了".format(threading.current_thread().name))flag=FalsewhileTrue:iflen(cups)>count:flag=Truetime.sleep(0.05)ifnotflag:cups.append(1)ifflag:breakprint("finished.cups={}".format(len(cups)))for_inrange(10):threading.Thread(target=worker,args=(1000,)).start()

运行结果如下图所示:

我们明明只需要到1000就会break,但是结果却到了1010个,这就是因为有10个线程,其中每个线程都在增加,但是增加后的数目,其他线程并不会知道(每个线程通过len函数拿到数量,但是刚拿到数字,其他线程就立即更新了)

这个时候我们就需要锁lock来实现了,一旦线程获得锁,其他试图获取锁的线程将被阻塞

名称 含义acquire(blocking=True,timeout=-1)默认阻塞,阻塞可以设置超时时间。非阻塞时,timeout禁止设置。成功获取锁,返回True,否则返回Falserelease( )释放锁。可以从任何线程释放。已上锁的锁,会抛出RuntimeError异常

加锁的实现:

importthreadingimporttimecups=[]lock=threading.Lock()defworker(count=10):print("我是{},我开始生产了".format(threading.current_thread().name))flag=FalsewhileTrue:lock.acquire()iflen(cups)>=count:flag=Truetime.sleep(0.005)ifnotflag:cups.append(1)lock.release()ifflag:breakprint("finished,cups={}".format(len(cups)))for_inrange(10):threading.Thread(target=worker,args=(1000,)).start()

运行结果如图所示:

一般来说加锁后还需要一些代码实现,在释放锁之前还有可能抛出异常,一旦出现异常,锁无法释放,但是当前这个线程会因为这个异常而终止,这样会产生死锁,因此使用时要使用如下的方法:

1,使用try...finally语句保证锁的释放

2,with安全上下文管理(锁对象支持上下文管理)


计数器类,用来加,减。

importthreadingimporttimeclassCounter:def__init__(self):self._val=0self.__lock=threading.Lock()@propertydefvalue(self):returnself._valdefinc(self):try:self.__lock.acquire()self._val+=1finally:self.__lock.release()defdec(self):withself.__lock:self._val-=1defrun(c:Counter,count=100):for_inrange(count):foriinrange(-50,50):ifi<0:c.dec()else:c.inc()c=Counter()c1=10c2=1000foriinrange(c1):threading.Thread(target=run,args=(c,c2)).start()whileTrue:ifthreading.active_count()==1:print(c.value)break

启动了10个线程,1000次从-50到50进行加减,最后得到0,如果没有加锁处理的话,得到的结果未必是自己想要的。


锁的使用场景:

锁适用于访问和修改同一个资源的时候,引起资源争用的情况下。使用锁的注意事项:

1,少用锁,除非有必要。多线程访问加锁的资源时,由于锁的存在,实际就变成了串行。

2,加锁时间越短越好,不需要就立即释放锁。

3,一定要避免死锁,使用with或者try...finally。


非阻塞锁使用

importthreadingimporttimedefworker(tasks):fortaskintasks:time.sleep(0.001)iftask.lock.acquire(False):print("{}{}begintostart".format(threading.current_thread(),task.name))else:print("{}{}isworking".format(threading.current_thread(),task.name))classTask:def__init__(self,name):self.name=nameself.lock=threading.Lock()tasks=[Task('task-{}'.format(x))forxinrange(10)]foriinrange(5):threading.Thread(target=worker,name="worker-{}".format(i),args=(tasks,)).start()

运行结果如下图所示:

总共开启了5个线程,每个线程处理10个任务,因为在if语句里面,task.lock.acquire(False),所以每个线程只有拿到锁是True,其他的线程不会阻塞会返回False。打印"is working"。