掌握高并发、高可用架构第二课 并发编程

从本课开始学习并发编程的内容。主要介绍并发编程的基础知识、锁、内存模型、线程池、各种并发容器的使用。

第九节 线程池

线程池 Executors Executor ExecutorService ThreadPoolExecutor

为何要使用线程池

有以下几个原因:

线程池改进了多线程应用程序的响应时间:由于线程池中的线程已经准备好且等待被分配任务,可以直接拿来使用而不用新建一个线程线程池节省了为每个短生命周期任务而创建一个完整的线程开销,并且可以在任务完成后回收资源线程池根据当前在系统中运行的进程来优化线程时间片线程池运行我们开启多个任务而不用为每个线程单独设置属性线程池允许我们为正在执行的任务的程序参数传递一个包含状态信息的对象引用线程池可以用来解决处理一个特定请求最大线程数量限制的问题

根本上说,我们使用线程池主要就是为了减少创建和销毁线程的次数,每个线程 都可以重复利用,可以执行多个任务,从而节省内存(线程开的越多,消耗的内存越大),提高了资源利用率。

线程池ThreadPoolExecutor

代码起始是Executor,这是一个接口。

抽象类AbstractExecutorService实现了接口ExecutorServiceExecutorService又继承了ExecutorAbstractExecutorService的默认实现类是ThreadPoolExecutor,这个类就是线程池。

ThreadPoolExecutor有4个构造函数:

public ThreadPoolExecutor(int corePoolSize, int maximunPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Running> workQueue);public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Running> workQueue, ThreadFactory factory);public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Running> workQueue, RejectedExecutionHandler handler);public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Running> workQueue, ThreadFactory factory, RejectedExecutionHandler handler);

corePoolSize:线程池中核心线程的最大数量

核心线程,线程池创建后,如果当前线程总数小于corePoolSize,则新建的就是核心线程,如果超过corePoolSize,则新建的就是非核心线程。

核心线程默认情况下会一直存活在线程池中,即使这个核心线程处于闲置状态。

如果指定ThreadPoolExecutor的allowCoreThreadTimeOut为true,那么核心线程如果处于闲置状态的话,超过一定时间(keepAliveTime)也会被销毁。

maximumPoolSize: 线程池中线程总数的最大值

线程总数 = 核心线程数 + 非核心线程数

keepAliveTime: 线程池中非核心线程闲置超时时长

默认情况下,一个非核心线程,如果闲置时长超过该参数设置,就会被销毁。如果设置参数allowCoreThreadTimeOut为true,则超时时长也会作用于核心线程。

TimeUnit unit: 时长单位

枚举值,MILLSECONDS:毫秒;SECONDS:秒;MINUTS:分钟;HOURS:小时;DAYS:天

workQueue: 阻塞队列

线程池中的任务队列,维护着等待执行的Runnable对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果这个队列满了,则会新建非核心线程来执行任务。

public interface BlockingQueue<E> extends Queue<E> { // 将指定元素添加到队列,成功返回true,否则抛出异常;如果是给限定了长度的队列中添加元素,推荐offer boolean add(E e); // 将指定的元素添加的队列,如果成功则返回true,否则返回false;元素不能为空,否则抛出NPE boolean offer(E e); // 将元素添加到队列,如果队列没有多余空间,方法会一直阻塞,直到队列有多余空间 boolean put(E e) throws InterruptedException; // 将元素在指定的时间内添加到队列中,成功返回true,否则返回false boolean off(E e, long timeout, TimeUnit unit) throws InterruptedException; // 从队列中获取值,如果队列没有元素,方法会一直阻塞,直到队列有值 E take() throws InterruptedException; // 在给定的时间内获取队列中的值,没有获取到会抛出异常 E poll(long timeout, TimeUnit unit) throws InterruptedException; // 获取队列的剩余空间 int remainingCapacity(); // 移除指定的值 boolean remove(Object o); // 判断队列是否包含值 boolean contains(Object o); // 将队列中的所有元素都移除,并设置到指定集合中 int drainTo(Collection<? extends E> c);}

一般来说,workQueue有以下四种队列类型:

SynchronousQueue:同步队列,这种队列在接收到任务时,会直接提交给线程处理,而不会保留它。假如所有线程都在忙碌,则会新建线程来处理这个任务。所以为了防止出现线程数达到maximumPoolSize而不能新建线程的错误,当使用这种队列时,需要把maximumPoolSize指定为Integer.MAX_VALUE。

LinkedBlockingQueue:×××链表阻塞队列,这种队列接收到任务时,如果当前线程数少于核心线程数,则会新建核心线程来处理任务;如果当前线程数达到核心线程数,则会保持到该队列中;由于队列×××,即所有超过核心线程数的任务都会存入队列,所以会导致maximumPoolSize失效。

ArrayBlockingQueue:有界数组阻塞队列,接收到任务时,如果当前线程数少于核心线程数,则会新建核心线程来处理任务;如果当前线程数达到核心线程数,则会保持到该队列中;如果队列已满,则新建线程来执行任务,如果队列已满,而且线程数已达到maximumPoolSize指定的数量,则会报错。

DelayQueue:延迟队列,队列元素必须实现Delayed接口,接收到任务时,先入队列,只有达到了指定时间,才会执行任务。

ThreadFactory factory: 创建线程的方式

这是一个接口,通过调用它的方法: Thread newThread(Runnable r)来创建线程

RejectedExecutionHandler handler: 线程池无法创建线程时,如何抛出异常

一般是当线程池中的线程数量已经达到最大值,或者线程池已经关闭时,会抛出一个RejectedExecutionException

既然线程池新添加了任务,那么线程池是如何处理这些批量任务?

如果线程数量未达到corePoolSize,则新建一个线程(核心线程)执行任务如果线程数量达到了corePoolSize,则将任务移入队列等待如果队列已满,新建线程(非核心线程)执行任务如果队列已满,总线程数又达到了maximumPoolSize,就会由RejectedExecutionHandler抛出异常四种线程池

1 newFixedThreadPool:定长的线程池,可控制线程的最大并发数,超出的任务会在队列中等待。

public class Executors { public static ExecutorService newFixedThreadPool(int nThread) { return new ThreadPoolExecutor(nThread, nThread, 0L, TimeUnit.MILLSECONDS, new LinkedBlockingQueue<Runnable>()); }}

2 newCachedThreadPool:可缓存的线程池,如果线程池长度超过需要,可以灵活回收空闲线程,如无可回收线程,则会新建线程来处理任务。

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}

3 newScheduledThreadPool:定长任务线程池,支持定时和周期性任务执行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}

4 newSingleThreadExecutor:一个 单线程的线程池,只有一个线程来执行任务。

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService( new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLSECONDS, new LinkedBlockingQueue<Runnable>()); );}