threading 之 semaphore信号量
import threadingimport loggingimport timeFORMAT = '%(threadName)s %(thread)d %(message)s'logging.basicConfig(format=FORMAT, level=logging.INFO)def worker(s:threading.Semaphore): logging.info('in sub thread') logging.info(s.acquire()) # 获取信号量,计数器 -1 logging.info('sub thread over')s = threading.Semaphore(3) # 创建3个信号量计数器logging.info(s.acquire())print(s._value) # 看看现在信号量的内的数值是多少logging.info(s.acquire())print(s._value)logging.info(s.acquire())print(s._value)threading.Thread(target=worker, args=(s, )).start()time.sleep(2)logging.info(s.acquire(False)) # 不阻塞,若获取不到信号量,则为Falselogging.info(s.acquire(timeout=10)) # 设置超时时间,等待超时时间过了还未获取到信号,返回值为False# 释放logging.info('released')s.release() # 释放信号量,计数器i + 1
简单的资源池演示
import threadingimport loggingimport randomFORMAT = '%(threadName)s %(thread)d %(message)s'logging.basicConfig(format=FORMAT, level=logging.INFO)class Conn: def __init__(self, name): self.name = name def __str__(self): return self.nameclass Pool: def __init__(self, count:int): self.count = count self.pool = [ self._connect('conn-{}'.format(x)) for x in range(self.count)] def _connect(self, conn_name): return Conn(conn_name) def get_conn(self): conn = self.pool.pop() return conn def return_conn(self, conn:Conn): self.pool.append(conn)pool = Pool(3)def worker(pool:Pool): conn = pool.get_conn() logging.info(conn) threading.Event().wait(random.randint(1,4)) pool.return_conn(conn)for i in range(6): threading.Thread(target=worker, name='worker-{}'.format(i), args=(pool,)).start()
使用semaphore来完善代码
import threadingimport loggingimport randomFORMAT = '%(threadName)s %(thread)d %(message)s'logging.basicConfig(format=FORMAT, level=logging.INFO)class Conn: def __init__(self, name): self.name = name def __str__(self): return self.nameclass Pool: def __init__(self, count:int): self.count = count self.pool = [ self._connect('conn-{}'.format(x)) for x in range(self.count)] self.semahore = threading.Semaphore(count) def _connect(self, conn_name): return Conn(conn_name) def get_conn(self): self.semahore.acquire() conn = self.pool.pop() return conn def return_conn(self, conn:Conn): self.pool.append(conn) self.semahore.release()pool = Pool(3)def worker(pool:Pool): conn = pool.get_conn() logging.info(conn) threading.Event().wait(random.randint(1,4)) pool.return_conn(conn)for i in range(6): threading.Thread(target=worker, name='worker-{}'.format(i), args=(pool,)).start()
关于信号量release超出初始值范围的问题
当我们使用semaphore的时候,如果还未acquire,就release了,会产生什么问题?
会产生是的信号量的值+1,超出了原本设置semaphore的初始值,下面的例子说明了这个问题
import threadingimport loggingFORMAT = '%(threadName)s %(thread)d %(message)s'logging.basicConfig(format=FORMAT, level=logging.INFO)sema = threading.Semaphore(3)logging.warning(sema.__dict__)for _ in range(3): sema.acquire()logging.warning('-----')logging.warning(sema.__dict__)for _ in range(4): sema.release()logging.warning(sema.__dict__)for _ in range(3): sema.acquire()logging.warning('--------')logging.warning(sema.__dict__)sema.acquire()logging.warning('======')logging.warning(sema.__dict__)
所以这个这样的问题,可以使用BoundedSemaphore类来实现有界的信号量,若relase超出了初始值的范围,会抛出ValueError异常
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。