线程是属于进程的,线程运行在进程空间内,同一进程所产生的线程共享同一内存空间,当进程退出时该进程所产生的线程都会被强制退出并清除。进程是资源分配的最小单位,线程是CPU调度的最小单位,每一个进程中至少有一个线程,线程可与属于同一进程的其它线程共享进程所拥有的全部资源,但是其本身基本上不拥有系统资源,只拥有一点在运行中必不可少的信息(如程序计数器、一组寄存器和栈)。

Threading模块提供线程相关的操作,Threading模块包含Thread,Lock,RLock,Event,Queue等组件;multiprocess模块完全模仿了threading模块的接口,二者在使用时,时极其相似的。

1、Thread创建线程的两种方式:

示例1:

import timefrom threading import Threaddef func(i): time.sleep(1) print("hello : %s"%i)thread_l = []# 开启多线程for i in range(10): t = Thread(target=func,args=(i,)) #实例化线程对象 t.start() # 激活线程 thread_l.append(t)# 异步开启阻塞for j in thread_l: j.join() # 阻塞主线程,子线程执行完毕之后向下执行主线程代码print("主线程")

结果:

hello : 2hello : 0hello : 1hello : 3hello : 5hello : 4hello : 7hello : 6hello : 9hello : 8主线程

示例2:使用类继承的方式创建线程

import timefrom threading import Threadclass MyThread(Thread): # 继承Thread类 count = 0 # 子线程间会共享此静态属性 def __init__(self,arg1,arg2): # 通过init方法传递参数 super().__init__() self.arg1 = arg1 self.arg2 = arg2 def run(self): # 必须实现run方法 MyThread.count += 1 time.sleep(1) print("%s,hello!%s"%(self.arg1,self.arg2))thread_l = []for i in range(10): t = MyThread('eric','jonny') t.start() thread_l.append(t)for j in thread_l: j.join()print("conut: %s"%MyThread.count)结果:1,hello!jonny0,hello!jonny5,hello!jonny4,hello!jonny3,hello!jonny2,hello!jonny6,hello!jonny9,hello!jonny7,hello!jonny8,hello!jonnyconut: 10Thread的主要方法:

t.start() :激活线程
t.join():阻塞(等待子线程执行完毕,在向下执行),在每次激活线程后阻塞会使线程变为同步,所以要在线程激活完毕之后阻塞。
t.name :设置或获取线程名
t.getName():获取线程名
t.setName(NAME):设置线程名
t.is_alive() :判断线程是否激活
t.setDaemon() :设置守护线程,在激活线程之前设置,默认值为False
t.isDaemon() : 判断是否为守护线程

2、Lock与RLock

同一个进程内的线程是数据共享的,线程的GIL(全局解释性)锁是锁的线程调用CPU的时间,在第一个线程调用CPU操作共享数据的时候,时间轮转至第二个线程,第二个线程也要操作共享数据,这样就导致了数据的不一致,这是因为GIL不锁数据,这种情况下,线程锁的出现就能解决这个这个问题。
示例1:GIL锁发挥作用

from threading import Threaddef func(): global n n -= 1n = 1000thread_l = []for i in range(100): t = Thread(target=func) t.start() thread_l.append(t)for j in thread_l: j.join()print(n)结果是:900

示例2:时间片轮转,GIL锁失效

import timefrom threading import Threaddef func(): global n # n -= 1 temp = n # 从进程中获取n time.sleep(0.01) # 每个线程调用CPU的时间是很短的,制造时间片轮转的效果 n = temp -1 # 得到结果,再将修改过的数据返回给进程n = 1000thread_l = []for i in range(100): t = Thread(target=func) t.start() thread_l.append(t)for j in thread_l: j.join()print(n)结果是:998

示例3:互斥锁Lock,对数据加锁

import timefrom threading import Threadfrom threading import Lockdef func(): global n # n -= 1 lock.acquire() # 上锁 temp = n # 从进程中获取n time.sleep(0.01) # 每个线程调用CPU的时间是很短的,制造时间片轮转的效果 n = temp -1 # 得到结果,再将修改过的数据返回给进程 lock.release() # 释放n = 1000lock = Lock() # 实例化锁对象thread_l = []for i in range(100): t = Thread(target=func) t.start() thread_l.append(t)for j in thread_l: j.join()print(n)结果是:900

互斥锁Lock后使数据一致,具有安全性,但是也带来了新的问题,因为锁的缘故,每一个线程都是串行的拿到锁,在释放;整个程序运行变成串行,效率降低。

示例4:递归锁Rlock
Lock在同一线程中只能被acquire一次,下一次的acquire必须等待release之后才可以;而RLock允许在同一线程中被多次acquire,但是acquire和release必须是成对存在。

from threading import Lockfrom threading import RLocklock = Lock() # 实例化出互斥锁对象lock.acquire()lock.acquire() # 在第二次acquire时,程序阻塞等待release之后拿到锁print("死锁")lock.release()lock.release()lock1 = RLock() # 实例化出递归锁对象lock1.acquire()lock1.acquire() # 可被多次acquireprint("running")lock1.release()lock1.release() # acquire与release成对出现

在多线程并发的情况下,同一个线程中,如果出现多次acquire,就可能发生死锁现象,使用RLock就不会出现死锁问题

3、Semaphore

线程的信号量与进程的信号量使用基本一致;信号量可以允许指定数量的线程操作上锁的数据,即一把锁有多个钥匙。对与有信号量限制的程序来说,信号量有几个任务就开启几个线程,在加锁阶段会限制程序运行数量,并不影响其它代码的并发。
示例:

import randomimport timefrom threading import Semaphorefrom threading import Threaddef sing(i,sem): sem.acquire() # 加锁 print('%s enter the ktv'%i) time.sleep(random.randint(1,10)) print('%s leave the ktv'%i) sem.release() # 释放sem = Semaphore(4)for i in range(20): t = Thread(target=sing, args=(i, sem)) t.start()4、Event

事件:线程之间状态标记通信,使用方法与进程的基本一致
主要方法:
e = Event() # 实例化一个事件对象
e.set() # 标记变为非阻塞
e.wait() # 默认标记为阻塞,在等待的过程中,遇到非阻塞信号就继续执行
e.clear() # 标记变为阻塞
e.is_set() # 是否阻塞 True就是非阻塞,False是阻塞

示例:连接数据库

'''连接数据库每0.5秒连一次,连接三次用事件来标志数据库的连接情况如果连接成功,显示成功否则报错'''import timeimport randomfrom threading import Threadfrom threading import Event# 模拟检查连接,检查连接成功使事件标志位为非阻塞def check_conn(): time.sleep(random.randint(1,3)) e.set()# 在还没检查成功前等待,接到非阻塞信号则连接数据库def conn_mysql(): count = 1 while not e.is_set(): if count > 3: raise TimeoutError print("尝试第 %s 次连接" % count) count += 1 e.wait(0.5) print("连接成功")e = Event() # 实例化事件对象Thread(target=check_conn).start()Thread(target=conn_mysql).start()5、Timer

定时器:定时开启一个线程,执行一个任务
示例:

from threading import Timerdef func(): print("hello")'''必须有两个参数第一个是时间,单位为秒第二个是要执行的函数'''Timer(1,func).start()6、Condition

条件变量:条件包含递归锁RLock和事件Event中的wait()方法的功能。
五个方法:
acquire(): 递归锁
release(): 释放锁
wait(timeout): 等待通知,或者等到设定的超时时间;才会被唤醒继续运行。wait()必须在已获得Lock前提下才能调用,否则会触发RuntimeError异常。
notify(n=1): 通知其他线程,传入的参数必须时int类型的,那些挂起的线程接到这个通知之后会开始运行,默认是通知一个正等待该condition的线程,最多则唤醒n个等待的线程。notify()必须在已获得Lock前提下才能调用,否则会触发RuntimeError。notify()不会主动释放Lock。
notifyAll(): 如果wait状态线程比较多,notifyAll的作用就是通知所有线程

示例:

from threading import Conditionfrom threading import Threaddef run(n): con.acquire() con.wait() print("run the thread: %s"%n) con.release()if __name__ == '__main__': con = Condition() for i in range(10): t = Thread(target=run,args=(i,)) t.start() while True: msg = input(">>> ") if msg == 'q': break con.acquire() # 递归锁 if msg == 'all': con.notify_all() # 放行所有线程 else: con.notify(int(msg)) # 传递信号,放行线程,参数是int类型的 con.release() # 释放锁7、Queue模块

queue模块就是线程的队列,它是数据安全的。
主要方法:
q.put(1) # 将传入的数据放入队列
q.get() # 根据对象所属类的不同,取出队列中的数据
q.join() # 等队列为空时,在执行别的操作
q.qsize() # 返回队列的大小,不一定准确
q.empty() # 队列为空时,返回True,否则返回False,不一定准确
q.full() # 队列满时,返回True,否则返回False,不一定准确

Queue类的使用:先进先出

import queueq = queue.Queue() # 实例化一个队列对象,可给出队列长度,先进先出q.put(1) # 将传入的数据放入队列q.put(2)print(q.get()) # 先进先出,取出队列的第一个值

LifoQueue类的主要方法:后进先出

import queuelfq = queue.LifoQueue() # 实例化一个对象,可给出长度,后进先出lfq.put(1)lfq.put(2)print(lfq.get()) #后进先出,取出2PriorityQueue类的主要方法:优先级import queuepq = queue.PriorityQueue() # 实例化一个队列对象,优先级队列,优先级值越小越优先pq.put((10,'a'))pq.put((5,'b'))pq.put((1,'c'))pq.put((15,'d'))for i in range(4): print(pq.get())结果:(1, 'c')(5, 'b')(10, 'a')(15, 'd')8、concurrent模块之futures

concurrent是用来操作池的模块,这个模块可创建进程池和线程池,其使用方法完全一致,统一了入口和方法,使用池更便捷,且python内置,导入便可使用。

主要方法:
submit(FUNC,ARGS):创建线程对象和激活线程,FUNC是要执行的任务,ARGS是参数
shutdown():shutdown方法封装了close和join方法,调用该方法时,不能在忘池中添加任务,且要等待池中任务执行结束
result():result方法取线程执行的函数返回值
map(FUNC,iter):map方法异步执行,需传入要执行的任务FUNC,以及一个可迭代对象iter,map方法无返回值
add_done_callback(call):回调函数

示例1:

import timeimport randomfrom concurrent import futuresdef func(n): time.sleep(random.randint(1,3)) print(n) return n*"*"thread_pool = futures.ThreadPoolExecutor(20) # 实例化一个线程池对象,一般开启CPU核数*5f_l = []for i in range(10): t = thread_pool.submit(func,i) # submit方法合并了创建线程对象和激活线程的功能 f_l.append(t)thread_pool.shutdown() # shutdown方法封装了close和join方法,调用该方法时,不能在忘池中添加任务,且要等待池中任务执行结束for j in f_l: print(j.result()) # result方法取线程执行的函数返回值

示例2:回调

from concurrent import futuresdef func(n): print(n) return n*"*"def call(args): print(args.result())thread_pool = futures.ThreadPoolExecutor(20)thread_pool.submit(func,1).add_done_callback(call)