Java并发编程基本知识
并发编程的原则
原子性原子性是指在一个操作中就是cpu不可以在中途暂停然后再调度,既不被中断操作,即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。
可见性对于可见性,Java提供了volatile关键字来保证可见性。当一个共享变量被volatile修饰时,它会保证修改的值会立即被更新到主存,当有其他线程需要读取时,它会去内存中读取新值。而普通的共享变量不能保证可见性,因为普通共享变量被修改之后,什么时候被写入主存是不确定的,当其他线程去读取时,此时内存中可能还是原来的旧值,因此无法保证可见性。另外,通过synchronized和Lock也能够保证可见性,synchronized和Lock能保证同一时刻只有一个线程获取锁然后执行同步代码,并且在释放锁之前会将对变量的修改刷新到主存当中。
有序性在Java内存模型中,允许编译器和处理器对指令进行重新排序,但是重新排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。
Runnable和Thread
这里只说一下实现Runnable接口和继承Thread类的区别:以卖10张票的任务为例,如果继承Thread类的话,启动三个线程就相当于开了三个窗口,每个窗口都有卖10张票的任务,各卖各的;如果实现Runnable接口的话,启动三个线程相当开了三个窗口卖票,这三个窗口一共卖10张票。
1. synchronized对象锁
synchronized(this)和synchronized方法都是锁当前对象,synchronized(obj)锁临界对象。使用synchronized的话最好是锁临界对象。如果想要使得任意多个线程任意多个用户访问的时候都不出任何问题,可以考虑一下用锁当前对象的方法,因为锁当前对象量级较重,所以一般不用。
如下面Sync类中的两个方法test_01和test_02()锁的都是程序创建的Sync对象,细粒度控制推荐用test_02()。
public synchronized void test_01() { System.out.println("锁当前对象");}public void test_02() { synchronized (this) { System.out.println("锁当前对象"); }}
下面这个方法锁的是Sync对象中的object对象(即临界对象)
public void test_03() { synchronized (object) { System.out.println("锁临界对象"); }}
2. synchronized使用在静态方法中锁定当前类
静态同步方法锁的是当前类型的类对象,如在Sync类中的static test_04()方法上加了同步锁synchronized,那么此时synchronized锁的是Sync.class。
// 下面两个方法都是静态同步方法
public static synchronized void test_04() { System.out.println("锁Sync.class");}public static void test_05() { synchronized (Sync.class) { System.out.println("锁Sync.class类"); }}
3. synchronized作用于静态和非静态方法的区别
synchronized作用与非静态方法,相当于锁定单个对象,不同对象之间没有竞争关系;而作用于静态方法时,锁加载类上,即锁定class,这时相当于所有对象竞争同一把锁。
同步代码块中抛出异常,锁被释放如下例子,线程1会在i=5的时候抛出异常,此时线程1锁被释放,线程2开始调用方法。
public class Test { static class Test02 implements Runnable { private int i = 0; @Override public synchronized void run() { while (true) { System.out.println(Thread.currentThread().getName() + "_" + i++); if (i == 5) { // 当i==5时抛出异常,锁被释放 i = 1 / 0; } try { TimeUnit.SECONDS.sleep(1); }catch (InterruptedException ignored) { } } } }
public static void main(String[] args) { Test02 test02 = new Test02(); new Thread(test02, "LQ").start(); new Thread(test02, "WH").start();}
}
实例分析在下面代码中,object被LQ锁定,WH阻塞。
public class Test {
static Object object = new Object();
void m() {
System.out.println(Thread.currentThread().getName() + " start...");
synchronized (object){
while (true) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception ignored) {}
System.out.println(Thread.currentThread().getName() + "-" + object.hashCode());
}
}
}
static class Test01 implements Runnable { @Override public void run() { new Test().m(); }}static class Test02 implements Runnable { @Override public void run() { new Test().m(); }}public static void main(String[] args) { Test01 test01 = new Test01(); Thread thread = new Thread(test01, "LQ"); thread.start(); try { TimeUnit.SECONDS.sleep(3); } catch (Exception ignored) {} Test02 test02 = new Test02(); thread = new Thread(test02, "WH"); thread.start();}
}
在WH线程中新创建了一个Object,WH正常运行。
public class Test { static Object object = new Object(); void m() { System.out.println(Thread.currentThread().getName() + " start..."); synchronized (object) { while (true) { try { TimeUnit.SECONDS.sleep(1); } catch (Exception ignored){} System.out.println(Thread.currentThread().getName() + "-" + object.hashCode()); } } } static class Test01 implements Runnable { @Override public void run() { new Test().m(); } } static class Test02 implements Runnable { @Override public void run() { object = new Object(); new Test().m(); } } public static void main(String[] args) { Test01 test01 = new Test01(); Thread thread = new Thread(test01, "LQ"); thread.start(); try { TimeUnit.SECONDS.sleep(3); } catch (Exception ignored) {} Test02 test02 = new Test02(); thread = new Thread(test02, "WH"); thread.start(); }}
上面代码中,WH线程启动后会一只处于等待状态,因为object被LQ线程锁着,但如果在WH线程中重新new Object()并赋值给object,这样的话WH线程就能够正常运行了,原因是:同步锁锁定的是对内存中的对象,所以LQ锁定的是第一次new的对象而WH锁定的是第二次new的对象,如下图。
对于常量:String a = “aaa” 和String b = “aaa”是同一个对象,因此,假如A方法锁定了a,B方法锁定了b,启动LQ线程调用A方法,然后启动WH线程调用B方法,这样的话WH线程会等到LQ线程结束后才执行。因此,在定义同步代码块时,不要使用常量作为锁的目标对象。
volatile关键字
计算机中有CPU、内存和缓存,当CPU运行的时候,默认找缓存中的数据。当CPU有中断的时候,根据操作系统对CPU的管理特性,可能会清空缓存,重新将内存中的数据读到缓存中,也可能不清空缓存,仍旧使用缓存中的数据进行后续的计算。如果CPU不中断的话,默认CPU只会找缓存数据。volatile这个关键字不是改变缓存数据特性的,而是直接改变内存中的数据特性,当对一个对象加了volatile关键字修饰的时候,相当于通知了底层OS操作系统,告诉CPU每次进行计算的时候最好去看一下内存数据是否发生了变更,这就是内存的可见性。volatile关键字就是为了保证内存的可见性。
如下代码会发生死锁现象。
public class Volatile01 { private static boolean b = true; private void m() { System.out.println("start..."); while (b) {} System.out.println("end..."); } static class Volatile_01 implements Runnable { @Override public void run() { new Volatile01().m(); } } public static void main(String[] args) { Volatile_01 = new Volatile_01(); new Thread(volatile_01).start(); try { TimeUnit.SECONDS.sleep(1); }catch (InterruptedException ignored) {} b = false; }}
当将上述代码块中的共享变量b用volatile修饰时(保证了可见性),就能够跳出循环了。
public class Volatile01 { private static volatile boolean b = true; private void m() { System.out.println("start..."); while (b){} System.out.println("end..."); } static class Volatile_01 implements Runnable { @Override public void run(){ new Volatile01().m(); } } public static void main(String[] args) { Volatile_01 = new Volatile_01(); new Thread(volatile_01).start(); try{ TimeUnit.SECONDS.sleep(1); }catch (InterruptedException ignored){} b = false; }}
join()方法
将多个线程连在一起,阻塞线程,直到调用join的线程执行完成。
如下程序打印的结果时100000,如果不用join()的话打印的结果将远远小于100000。用join()可以用来等待一组线程执行完毕后再进行后续逻辑处理,以保证数据的正确。
public class Test { private static volatile int count = 0; private void m() { for (int i = 0; i < 10000; i++) { count++; } } static class Test02 implements Runnable { @Override public synchronized void run() { new Test().m(); } } public static void main(String[] args) { Test02 test02 = new Test02(); List<Thread> threads = new ArrayList<>(); for (int i = 0; i < 10; i++) { threads.add(new Thread(test02)); } for (Thread thread : threads) { thread.start(); } for (Thread thread : threads) { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(count); }}
上述代码中用了synchronized关键字来实现原子性,也可以不用synchronized而用AtomicInteger对象,因为AtomicInteger是一个原子性操作对象,代码如下。
public class Test{ private static AtomicInteger count = new AtomicInteger(); private void m(){ for (int i = 0; i < 10000; i++){ count.incrementAndGet(); } } static class Test02 implements Runnable{ @Override public void run(){ new Test().m(); } } public static void main(String[] args){ Test02 test02 = new Test02(); List<Thread> threads = new ArrayList<>(); for (int i = 0; i < 10; i++){ threads.add(new Thread(test02)); } for (Thread thread : threads){ thread.start(); try{ thread.join(); }catch (InterruptedException e){ e.printStackTrace(); } } System.out.println(count); }}
CountDownLatch对象
CountDownLatch相当于一个门闩,在创建门闩对象的时候可以指定锁的个数,若某个方法调用了门闩的await()方法,那么该方法执行到await()的时候会被阻塞等待门闩释放,当门闩上没有锁也就是门闩开放的时候继续执行。减门闩上锁的方法时countDown()。
如下例,当在m1中调用了await(),在m2中调用了countDown(),因此根据m2的逻辑当m2执行完了之后门闩上的锁数量就为0了,此时m1方法可以继续执行了。
public class Test { private CountDownLatch countDownLatch = new CountDownLatch(5); private void m1() { try { countDownLatch.await(); // 等待门闩开放 } catch (Exception ignored) { } System.out.println("method m1."); } private void m2() { while (countDownLatch.getCount() != 0) { countDownLatch.countDown(); // 减门闩上的锁 System.out.println("method m2"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException ignored) { } } }
public static void main(String[] args) { Test count01 = new Test(); new Thread(count01::m2).start(); new Thread(count01::m1).start();}
}
门闩可以和锁混合使用,或替代锁的功能,再门闩开放之前等待,当门闩完全开放之后执行,可避免锁的效率低下问题。
wait()、notify()和notifyAll()
wait():在对象上调用wait(), 会使当前线程进入等待状态, 直至另一个线程对这个对象调用了notify() 或notifyAll() 方法唤醒线程。
notify():唤醒对象正在等待的一个线程。
notifyAll():当调用对象的notifyAll()方法时,所有waiting状态的线程都会被唤醒。
(生产者消费者)自定义同步容器,容器上限为10,可以在多线程中应用,并保证数据线程安全。
public class DeviceSingleton<E> { private DeviceSingleton() { } private final int max = 10; private int count = 0; private static final DeviceSingleton DEVICE_SINGLETON = new DeviceSingleton(); public static DeviceSingleton getInstance() { return DEVICE_SINGLETON; } private final List<E> devices = new ArrayList<>(); /** * 添加 */ public synchronized void add(E data) { // 当容器满了之后进入等待状态 while (devices.size() == max) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("add: " + data); ThreadUtils.sleep(1000); devices.add(data); count++; this.notify(); } /** * 获取 */ public synchronized E get() { E data = null; while (devices.size() == 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } ThreadUtils.sleep(1000); data = devices.remove(0); count--; this.notifyAll(); return data; } /** * 获取长度 */ public synchronized int size() { return count; } @Data static class Device { private int id; private String name; public Device(int id, String name) { this.id = id; this.name = name; } } static class ThreadUtils { public static void sleep(int millis) { try { Thread.sleep(millis); } catch (Exception ignore) {} } }}
public class Test { public static void main(String[] args) throws InterruptedException { DeviceSingleton deviceSingleton = DeviceSingleton.getInstance(); for (int i = 0; i < 10; i++) { new Thread(() -> { for (int j = 0; j < 5; j++) { System.out.println(deviceSingleton.get()); } }, "consumer-" + i).start(); } Thread.sleep(2000); for (int i = 0; i < 2; i++) { new Thread(() -> { for (int j = 0; j < 25; j++) { deviceSingleton.add(new DeviceSingleton.Device(j, "device " + j)); } }, "producer").start(); } }}
ReentrantLock锁
重入锁为尽量避免使用synchronized和同步方法出现的一种多线程锁机制,建议使用的同步方式,效率比synchronized高。使用重入锁时,需要手动释放锁(lock.unlock())。示例如下:
public class ReentrantLockTest { private final Lock lock = new ReentrantLock(); private void m1() { lock.lock(); // 加锁 for (int i = 0; i < 10; i++) { System.out.println("method m1() " + i); ThreadUtils.sleep(1000); } lock.unlock(); // 解锁 } private void m2() { lock.lock(); // 加锁 System.out.println("method m2()"); lock.unlock(); // 解锁 } public static void main(String[] args) { ReentrantLockTest reentrantLockTest = new ReentrantLockTest(); new Thread(reentrantLockTest::m1).start(); new Thread(reentrantLockTest::m2).start(); }}
尝试锁 lock.tryLock()
如果没有获取到锁标记则返回false,当前线程等待,如果获取到了锁标记,则返回true,当前线程被锁定执行。示例如下:
public class ReentrantLockTest { private Lock lock = new ReentrantLock(); private void m1() { lock.lock(); // 加锁 for (int i = 0; i < 10; i++) { ThreadUtils.sleep(1000); System.out.println("method m1() " + i); } lock.unlock(); // 解锁 }
private void m2() { boolean isLocked = false; try { /* 尝试锁,如果有锁,则无法获取锁标记,返回false,否则返回true 如果无法获取到锁标记,则说明别的线程正在使用锁,该线程等待 如果获取到了锁标记,则该线程的代码块被锁定 下面是获取锁标记的无参方法,当执行到该语句的时候立刻获取锁标记 也可以用有参的,即当执行到该语句多长时间之内获取锁标记,如果超时,不等待,直接返回。如isLocked = lock.tryLock(5, TimeUnit.SECONDS);表示5秒之内获取锁标记(5秒之内任何时间获取到锁标记都会继续执行),如果超时则直接返回。 */ isLocked = lock.tryLock(); System.out.println(isLocked ? "m2() synchronized" : "m2() unsynchronized"); } catch (Exception e) { e.printStackTrace(); } finally { // 尝试锁在解除锁标记的时候一定要判断是否获取到锁标记 if (isLocked) { lock.unlock(); } }}
public static void main(String[] args) { ReentrantLockTest reentrantLockTest = new ReentrantLockTest(); new Thread(reentrantLockTest::m1).start(); new Thread(reentrantLockTest::m2).start();}}
可中断锁lock.lockInterruptibly()
非可中断锁当客户端调用interrupt方法时,只是简单的去设置interrupted中断状态,并没有进一步抛出异常,而可中断锁在监测到中断请求时会抛出InterruptedException ,进而中断线程执行。示例如下:
public class ReentrantLockTest { private Lock lock = new ReentrantLock(); private void m1() { lock.lock(); // 加锁 for (int i = 0; i < 5; i++) { ThreadUtils.sleep(1000); System.out.println("method m1() " + i); } lock.unlock(); // 解锁 } private void m2() { try { /* 可打断锁,阻塞等待锁,可以被其他的线程打断阻塞状态 */ lock.lockInterruptibly(); // 可尝试打断 System.out.println("method m2()"); } catch (InterruptedException e) { System.out.println("锁被打断"); } finally { try { lock.unlock(); } catch (Exception ignored) { } } }
public static void main(String[] args) { ReentrantLockTest reentrantLockTest = new ReentrantLockTest(); Thread thread1 = new Thread(reentrantLockTest::m1); thread1.start(); ThreadUtils.sleep(1000); Thread thread2 = new Thread(reentrantLockTest::m2); thread2.start(); ThreadUtils.sleep(1000); thread2.interrupt(); // 打断线程休眠}
}
注意:用ReentrantLock打断锁,如果要打断的话是用线程打断,跟唤醒不同,notifyAll唤醒是用对象区唤醒。(打断thread.interruped(); 唤醒object.notifyAll())。
线程打断有什么用呢?
我们在用Windows的时候经常会遇到软件锁死的问题,这时候我们往往会通过打开任务管理器来结束进程,这种结束进程可以认为是打断锁的阻塞状态(即非正常结束)。
公平锁先到先得。若没有特殊情况,不建议使用公平锁,如果使用公平锁的话,一般来说并发量<=10,如果并发量较大,而不可避免的有访问先后顺序的话,建议采用别的方法。
public class ReentrantLockTest { static class TestReentrantLock extends Thread { // 在创建ReentrantLock对象的时候传参为true就代表创建公平锁 private ReentrantLock lock = new ReentrantLock(true); public void run() { for (int i = 0; i < 5; i++) { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " get lock."); ThreadUtils.sleep(1000); } finally { lock.unlock(); } } } }
public static void main(String[] args) { TestReentrantLock lock = new TestReentrantLock(); lock.start(); new Thread(lock).start(); new Thread(lock).start();}
}
Condition为Lock增加条件,当条件满足时做一些事情,如加锁或解锁、等待或唤醒等。下面示例就是使用Condition实现的生产者消费者。
public class DeviceContainer<T> {
private DeviceContainer() {
}
private static final DeviceContainer DEVICE_CONTAINER = new DeviceContainer<>();public static DeviceContainer getInstance() { return DEVICE_CONTAINER;}private final List<T> list = new LinkedList<>();private final int max = 10;private int count = 0;private Lock lock = new ReentrantLock();private Condition producer = lock.newCondition();private Condition consumer = lock.newCondition();public void add(T t) { lock.lock(); try { while (this.size() == max) { System.out.println(Thread.currentThread().getName() + " 等待"); // 当数据长度为max的时候,生产者进入等待队列,释放锁标记 // 借助条件进入的等待队列 producer.await(); } System.out.println(Thread.currentThread().getName() + " 添加"); list.add(t); count++; // 借助条件唤醒所有的消费者 consumer.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); }}public T get() { T t = null; lock.lock(); try { while (this.size() == 0) { System.out.println(Thread.currentThread().getName() + " 等待"); // 借助条件使消费者进入等待队列 consumer.await(); } System.out.println(Thread.currentThread().getName() + " 获取"); t = list.remove(0); count--; // 借助条件唤醒所有生产者 producer.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return t;}private int size() { return count;}
}
public class Test {
public static void main(String[] args) throws InterruptedException {
DeviceContainer<Device> deviceSingleton = DeviceContainer.getInstance();
for (int i = 0; i < 10; i++) {
new Thread(() ->
{
for (int j = 0; j < 5; j++) {
System.out.println(deviceSingleton.get());
}
}, "consumer-" + i).start();
}
ThreadUtils.sleep(1000);
for (int i = 0; i < 2; i++) {
new Thread(() ->
{
for (int j = 0; j < 25; j++) {
deviceSingleton.add(new Device(j, "device " + j));
}
}, "producer-" + i).start();
}
}
}
Java中的同步容器Map/SetConcurrentHashMap/ConcurrentHashSet:底层哈希实现的Map/Set,效率高,使用底层技术实现的线程安全,量级较synchronized轻。key和value不能为null(不同于HashMap和HashSet)
ConcurrentSkipListMap/ConcurrentSkipListSet:底层跳表实现的Map/Set,有序,线程安全,效率较ConcurrentHashMap/ConcurrentHashSet低。
CopyOnWriteArraySet:底层数组,线程安全,增加和删除效率低,查询效率高。
ListCopyOnWriteArrayList:底层数组,线程安全,增加和删除效率低,查询效率高。
QueueConcurrentLinkedQueue/ ConcurrentLinkedDeue:基础链表同步队列,非阻塞,ConcurrentLinkedQueue底层单向链表,ConcurrentLinkedDeue底层双向链表,均***。
ArrayBlockingQueue/LinkedBlockingQueue:阻塞队列,队列容量不足自动阻塞,队列容量为0自动阻塞。ArrayBlockingQueue底层使用数组,有界;LinkedBlockingQueue底层使用链表,默认***。ArrayBlockingQueue根据调用API的不同,有不同的特性。当容量不足的时候有阻塞能力。add方法在容量不足的时候会抛出异常;put方法在容量不足时阻塞等待;offer默认不阻塞,当容量不足的时候返回false,否则返回true;三参offer可设定阻塞时长,若在阻塞时长内有容量空闲,则添加并返回true,如果阻塞时长范围内无容量空闲,放弃新增数据并返回false。LinkedBlockingQueue的add方法在容量不足的时候会抛出异常;offer方法在容量不足时返回false,否则返回true;三参offer可设定阻塞时长,若在阻塞时长内有容量空闲,则添加并返回true,如果阻塞时长范围内无容量空闲,放弃新增数据并返回false。
PriorityQueue:有限集队列,底层数组,***。
PriorityBlockingQueue:优先级阻塞队列,底层数组,***。
LinkedTransferQueue:转移队列,使用transfer方法实现数据的即时处理。队列使用add保存数据,不做阻塞等待。transfer是TransferQueue的特有方法,转移队列必须要有消费者(take()方法的调用者)。如果没有任何线程消费数据,则transfer方法阻塞。一般用于处理即时消息。
SynchronousQueue:阻塞的同步队列,有界。是一个容量为0的队列,是一个特殊的TransferQuque。必须先有消费线程等待才能使用的队列。add方法无阻塞,若没有消费线程阻塞等待数据,则抛出异常。put方法有阻塞,若没有消费线程阻塞等待数据,则put方法阻塞。
DelayQueue:延时阻塞队列,***。类似轮询机制,一般用来做定时任务。业务场景举例:具有过期时间的缓存,订单过期自动取消等。
线程池
线程池是一个进程级的资源,默认的生命周期和JVM一致,即从开启线程池开始,到JVM关闭为止,是线程池的默认生命周期。如果显式调用shutdown方法,那么线程池执行所有的任务后自动关闭。
Executor接口
线程池顶级接口。Executor中只有一个方法execute,是用来处理任务的一个服务方法。调用者提供Runnable接口的实现,线程池通过执行线程执行这个Runnable。
public class Executor01 {
public static void main(String[] args) {
new Executor_01().execute(() ->
System.out.println(Thread.currentThread().getName() + " test executor.")
);
}
static class Executor_01 implements Executor {@Override
br/>@Override
new Thread(command).start();
}
}
}
ExecutorService
Executor的子接口,与Executor不同的是,它还提供了一个返回值为Future的服务方法submit。
Executors工具类
Executor的工具类,为线程池提供工具方法,可快速创建线程池,所有的线程池类型都实现了这个接口,实现了这个接口就代表有提供线程池的能力。常用方法有:void execute(),Future submit(Callable),Future submit(Runnable),void shutdown,boolean isShutdown(),boolean isTerminated()。
public class Test {
public static void main(String[] args) throws InterruptedException {
// 创建一个长度为5的线程池对象
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 6; i++) {
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " executor.");
ThreadUtils.sleep(1000);
});
}
System.out.println(executorService);
// 优雅关闭 executorService.shutdown(); // 是否已经结束,相当于判断是否回收了资源,因为线程睡眠,此时还未回收,因此为false System.out.println(executorService.isTerminated()); // 是否已经关闭,即是否调用过shutdown方法 System.out.println(executorService.isShutdown()); System.out.println(executorService); ThreadUtils.sleep(1000); // 因为上面睡了5秒,任务都已经执行完了,资源也被回收了,因此为true System.out.println(executorService.isTerminated()); System.out.println(executorService.isShutdown()); System.out.println(executorService);}
}
Future
未来结果,代表线程执行结束后的结果。通过get方法获取线程执行结果。
常用方法:get()、get(long, TimeUnit)和isDown()。
get():阻塞等待线程执行结束并得到返回结果;
get(long, TimeUnit):阻塞固定时长,等待线程结束后的结果,如果在阻塞时长范围内线程未执行结束,抛出异常。
isDown():判断线程是否结束即判断call方法是否已完成,要特别注意,这里的isDown与ExecutorService中的isShutdown不同,isShutdown是用来判断线程是否关闭的。
public class ExecutorServiceTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
testExecutorService();
}
private static void testExecutorService() throws ExecutionException, InterruptedException { ExecutorService service = Executors.newFixedThreadPool(1); Future<String> future = service.submit(() -> { ThreadUtils.sleep(1000); return Thread.currentThread().getName() + " submit."; }); // 查看任务是否完成即线程是否结束即call方法是否执行结束, // 要注意的是,这里判断是否结束,跟ExecutorService中的isShutDowm不同, isShutdowm是判断线程是否结束,而shutdown表示关闭线程 System.out.println(future.isDone()); // 获取call方法的返回值 System.out.println(future.get()); // false System.out.println(future.isDone()); System.out.println(future.get()); // true // 关闭线程池 service.shutdown();}
}
Callable接口
可执行接口。类似Runnable接口,也是可以启动线程的接口。
接口方法:call(),相当于Runnable中的run方法,区别在于call方法有返回值。
Callable和Runnable的选择:当需要返回值或需要抛出异常时,使用Callable,其他情况任意选。
ThreadPoolExecutor创建线程池
通过new ThreadPoolExecutor来创建,下图是ThreadPoolExecutor的三个构造方法:
参数说明:
corePoolSize 核心线程数
maximumPoolSize 最大线程数
keepAliveTime 线程最大空闲时间
unitTimeUnit 时间单位
workQueueBlockingQueue<Runnable> 线程等待队列
threadFactoryThreadFactory 线程创建工厂
handlerRejectedExecutionHandler 拒绝策略
核心线程数和最大线程数:
当提交一个新任务到线程池时首先判断核心线程数corePoolSize是否已达上限,若未达到corePoolSize上限,创建一个工作线程来执行任务;否则,再判断线程池工作队列workQueueBlockingQueue是否已满,若没满,则将新提交的任务存储在工作队列里;否则,线程池将判断最大线程数是否已达上限,若未达到maximumPoolSize上限,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。如果线程池中的线程数量大于核心线程数 corePoolSize 时,线程空闲时间超过线程最大空闲时间keepAliveTime,则线程将被终止,直至线程池中的线程数目不大于corePoolSize。
自定义线程池
public class ExecutorThreadPoolTest {
public static void main(String[] args) {
testExecutorThreadPool();
}
private static void testExecutorThreadPool() { // 创建线程池,核心线程数为2,最大线程数为4,最大空闲时间为10 ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), new MyTreadFactory(), new MyIgnorePolicy()); // 启动所有核心线程,使其出与等待状态 executor.prestartAllCoreThreads(); // 创建并执行任务 for (int i = 1; i <= 10; i++) { MyTask task = new MyTask(String.valueOf(i)); executor.execute(task); }}static class MyTreadFactory implements ThreadFactory { private final AtomicInteger mThreadNum = new AtomicInteger(1); @Override public Thread newThread(Runnable runnable) { Thread t = new Thread(runnable, "线程【" + mThreadNum.getAndIncrement() + "】"); System.out.println(t.getName() + " 已创建"); return t; }}public static class MyIgnorePolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { doLog(runnable, executor); } private void doLog(Runnable runnable, ThreadPoolExecutor executor) { System.err.println(runnable.toString() + " 被拒绝"); }}@Datastatic class MyTask implements Runnable { private String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println(this.toString() + " 正在运行"); ThreadUtils.sleep(1000); } @Override public String toString() { return "线程【" + name + "】"; }}
}
FixedThreadPool线程池
固定容量的线程池,可由Executors来创建,活动状态和线程池容量是有上限的,需要手动销毁线程池。构造方法如下:
由此可见,该线程池核心线程数和最大线程数均为构造参数值nThreads,线程最大空闲时间为0,任务队列采用LinkedBlockingQueue,默认容量上限是Integer.MAX_VALUE。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() { // 创建容量为10的FixedThreadPool线程池 ExecutorService service = Executors.newFixedThreadPool(10); for (int i = 0; i < 100; i++) { service.execute(()-> System.out.println(Thread.currentThread().getName())); } // 销毁线程池 service.shutdown();}
}
CachedThreadPool线程池
缓存线程池,通过Executors来创建,默认最大容量为Integer.MAX_VALUE,自动扩容,执行完后自动销毁(这一点与FixedThreadPool不同,FixedThreadPool的销毁需要手动调用shutdown方法)。构造方法如下:
由构造方法可见,核心线程数为0,最大线程数为Integer.MAX_VALUE,最大空闲时间为60秒,任务队列使用SynchronousQueue。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() { // 创建缓存线程池 ExecutorService service = Executors.newCachedThreadPool(); System.out.println(service); for (int i = 0; i < 5; i++) { service.execute(() -> { ThreadUtils.sleep(1000); System.out.println(Thread.currentThread().getName() + " executor."); }); } System.out.println(service); ThreadUtils.sleep(65); System.out.println(service);}
}
ScheduledThreadPool线程池
计划任务线程池,可以根据任务自动执行计划的线程池,由Executors创建,需要手动销毁。计划任务时选用,如需要定时整理数据、服务器定期清除无效文件等。构造方法如下:
核心线程数为构造参数大小,最大线程数为Integer.MAX_VALUE,最大空闲时间0,任务队列使用DelayedWorkQuquq。
常用方法有:scheduledAtFixedRate、schedule、execute等。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() { // 创建计划任务线程池 ScheduledExecutorService service = Executors.newScheduledThreadPool(3); System.out.println(service); // 定期任务,线程池启动500毫秒后第一次执行任务,以后每300毫秒执行一次 service.scheduleAtFixedRate(() -> { ThreadUtils.sleep(1000); System.out.println(Thread.currentThread().getName() + " executor."); }, 500, 300, TimeUnit.MILLISECONDS); System.out.println(service); service.shutdown();}
}
SingleThreadExecutor线程池
单一容量的线程池。需要手动销毁。有保证任务顺序需求时可选用。如大厅中的公共频道聊天,固定数量商品的秒杀等。构造方法如下:
核心线程数和最大线程数均为1,任务队列为LinkedBlockingQueue。
public class Test {
public static void main(String[] args) {
new Test().test();
}
public void test() { // 创建单一容量线程池 ExecutorService service = Executors.newSingleThreadExecutor(); System.out.println(service); for (int i = 0; i < 5; i++) { service.execute(() -> { System.out.println(Thread.currentThread().getName() + " executor."); ThreadUtils.sleep(1000); }); } service.shutdown();}
}
ForkJoinPool线程池
分支合并线程池,适用于处理复杂任务。初始化线程容量与CPU核心数有关。
ForkJoinPool没有所谓的容量,默认都是一个线程,根据任务自动分支新的子线程,,当子线程结束后自动合并。所谓自动合并,是用fork和join两个方法实现的(手动调用)。
线程池中运行的可分治合并的任务必须是ForkJoinTask的子类型(RecursiveTask或RecursiveAction,二者的区别在于一个运行完之后有返回值,一个没有),其中提供了分支和合并能力。
ForkJoinTask提供了两个抽象子类型RecursiveTask和RecursiveAction,RecursiveTask是有返回结果的分支合并任务,RecursiveAction是无返回结果的分支合并任务(类似Callable和Runnable的区别)。
ForkJoinTask提供了一个compute方法,这个方法里面就是任务的执行逻辑。
该线程池主要用于大量数据的计算、数据分析等。
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException { long result = 0L; for (int NUMBER : NUMBERS) { result += NUMBER; } System.out.println(result); ForkJoinPool pool = new ForkJoinPool(); // 分支合并任务 AddTask task = new AddTask(0, NUMBERS.length); // 提交任务 Future<Long> future = pool.submit(task); System.out.println(future.get());}private static final int[] NUMBERS = new int[1000000];private static final int MAX_SIZE = 50000;private static final Random RANDOM = new Random();static { for (int i = 0; i < NUMBERS.length; i++) { NUMBERS[i] = RANDOM.nextInt(1000); }}static class AddTask extends RecursiveTask<Long> { int begin, end; AddTask(int begin, int end) { this.begin = begin; this.end = end; } @Override protected Long compute() { if ((end - begin) < MAX_SIZE) { long sum = 0L; for (int i = begin; i < end; i++) { sum += NUMBERS[i]; } return sum; } else { // 当结束值减去开始值大于临界值的时候进行分支 int middle = begin + (end - begin) / 2; AddTask task1 = new AddTask(begin, middle); AddTask task2 = new AddTask(middle, end); // 分支的工作,就是开启一个新的线程任务 task1.fork(); task2.fork(); // join就是合并,将任务的结果获取,是一个阻塞方法,一定会得到结果数据 return task1.join() + task2.join(); } }}
}
线程组一组线程的集合,线程组中多个线程执行同一批任务,线程之间是隔离的,互不影响。同一组的线程之间可以通信,但不同组的线程之间不能通信,这样就做到了线程屏蔽,保证了线程安全。
public class Test {
public static void main(String[] args) { new Test().test();}public void test() { ThreadGroup group = new ThreadGroup("LQ"); Thread thread = new Thread(group, () -> System.out.println("group is " + Thread.currentThread().getThreadGroup().getName()) ); thread.start();}
}
朋友们觉得内容有什么错误、不足之处,或者有什么疑问,尽可留言指出来,一起学习哦。
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。