python解决线程同步方案有哪些
先提到线程同步是个什么,概念是什么,就是线程通讯中通过使用某种技术访问数据时,而一旦此线程访问到,其他线程也就不能访问了,直到该线程对数据完成操作才结束。 Event事件是一种实现方式:通过内部的标记看看是不是变化,也就是true or false了, 将set(),clear(),is_set(),为true,wait(timeout=None)此种设置true的时长,等到返回false,不等到超时返回false,无线等待为None, 来看一个wait的使用:
from threading import Event, Threadimport logginglogging.basicConfig(level=logging.INFO)def A(event:Event, interval:int): while not event.wait(interval): # 要么true or false logging.info('hello')e = Event()Thread(target=A, args=(e, 3)).start()e.wait(8)e.set()print('end--------------')
使用锁Lock解决数据资源在争抢,从而使资源有效利用。
lock的方法:
acquire(blocking=True,timeout=-1),默认阻塞,阻塞设置超时时间,非阻塞,timeout禁止使用,成功获取锁,返回True,否则False。
有阻塞就有释放 ,解开锁,release(),从线程释放锁,上锁的锁重置为unloced未上锁调用,抛出RuntimeError异常。
import threadingfrom threading import Thread, Lockimport loggingimport timeFORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
cups = []lock = Lock()def worker(count=10): logging.info("I'm working for U.") flag = False while True:
lock.acquire() # 获取锁 if len(cups) >= count: flag = True time.sleep(0.0001) if not flag: cups.append(1) if flag: breaklogging.info('I finished. cups = {}'.format(len(cups)))
for _ in range(10): Thread(target=worker, args=(1000,)).start()
使用锁的过程中,总是不经意加上锁,出现死锁的产生,出现了死锁,如何解决呢?
使用try finally 将锁释放,另一种使用with上下文管理。
锁的使用场景在于应该少用锁,还要就是如若上锁,将锁的使用时间缩短,避免时间太长而出现无法释放锁的结果。
可重入锁Lock,
import threadingimport timelock = threading.RLock()print(lock.acquire())print('------------')print(lock.acquire(blocking=False))print(lock.acquire())print(lock.acquire(timeout=3.55))print(lock.acquire(blocking=False))#print(lock.acquire(blocking=False, timeout=10)) # 异常lock.release()lock.release()lock.release()lock.release()print('main thread {}'.format(threading.current_thread().ident))print("lock in main thread {}".format(lock)) # 注意观察lock对象的信息lock.release()#lock.release() #多了一次print('===============')print()print(lock.acquire(blocking=False)) # 1次#threading.Timer(3, lambda x:x.release(), args=(lock,)).start() # 跨线程了,异常lock.release()print('~~~~~~~~~~~~~~~~~')print()# 测试多线程print(lock.acquire())def sub(l): print('{}: {}'.format(threading.current_thread(), l.acquire())) # 阻塞 print('{}: {}'.format(threading.current_thread(), l.acquire(False))) print('lock in sub thread {}'.format(lock)) l.release() print('sub 1') l.release() print('sub 2') # l.release() # 多了一次threading.Timer(2, sub, args=(lock,)).start() # 传入同一个lock对象print('++++++++++++++++++++++')print()print(lock.acquire())lock.release()time.sleep(5)print('释放主线程锁')lock.release()
使用构造方法Condition(lock=None),默认是Rloc,
具体方法为;
acquire(*args),获取锁
wait(self,timeout=None),等待或超时
notify(n=1),唤醒线程,没有等待就没有任何操作,指线程
notify_all(),唤醒所有等待的线程。
Condition主要用于生产者和消费者模型中,解决匹配的问题。
使用方式:先获取acquire,使用完了要释放release,避免死锁最好使用with上下文;生产者和消费者可以使用notify and notify_all。
如下例子:
from threading import Thread, Eventimport loggingimport randomFORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'logging.basicConfig(format=FORMAT, level=logging.INFO)## 此例只是为了演示,不考虑线程安全问题class Dispatcher: def __init__(self): self.data = None self.event = Event() # event只是为了使用方便,与逻辑无关 def produce(self, total): for _ in range(total): data = random.randint(0,100) logging.info(data) self.data = data self.event.wait(1) self.event.set() def consume(self): while not self.event.is_set(): data = self.data logging.info("recieved {}".format(data)) self.data = None self.event.wait(0.5)d = Dispatcher()p = Thread(target=d.produce, args=(10,), name='producer')c = Thread(target=d.consume, name='consumer')c.start()p.start()
这里代码会有缺陷:优化如下:
from threading import Thread, Event, Conditionimport loggingimport randomFORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'logging.basicConfig(format=FORMAT, level=logging.INFO)## 此例只是为了演示,不考虑线程安全问题class Dispatcher: def __init__(self): self.data = None self.event = Event() # event只是为了使用方便,与逻辑无关 self.cond = Condition() def produce(self, total): for _ in range(total): data = random.randint(0,100) with self.cond: logging.info(data) self.data = data self.cond.notify_all() self.event.wait(1) # 模拟产生数据速度 self.event.set() def consume(self): while not self.event.is_set(): with self.cond: self.cond.wait() # 阻塞等通知 logging.info("received {}".format(self.data)) self.event.wait(0.5) # 模拟消费的速度d = Dispatcher()p = Thread(target=d.produce, args=(10,), name='producer')# 增加消费者for i in range(5): c = Thread(target=d.consume, name='consumer-{}'.format(i)) c.start()p.start()
Barrier的使用:
方法如下:
Barrier(parties, action=None,
timeout=None),构建barrier对象,timeout未指定的默认值;
n_waiting ,当前barrier等待的线程数。;
parties ,需要等待
wait(timeout=None),wait方法设置超时并超时发送,barrie处于broken状态。
而broken的状态方法:
broken,打开状态,返回true;
abort(),barrier在broken状态中,wait等待的线程会抛出BrokenBarrierError异常,直到reset恢复barrier;
reset(),恢复barrier,重新开始拦截。
barrier不做演示:
还有semaphore信号量,每次acquire时,都会减一,到0时的线程再到release后,大于0,恢复阻塞的线程。
方法:
Semaphore(value=1) 构造方法,alue小于0,抛ValueError异常;
acquire(blocking=True, timeout=None) 获取信号量,计数器减1,获取成功返回True;
release() 释放信号量,计数器加1。
使用信号量处理时,需要注意release超界问题,边界问题,其实,在使用中,python有GIL的存在,有的多线程就变成线程安全的,注意一点,但实际上它们并不是线程安全类型。因此我们在使用中要具体场景具体分析具体使用。
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。