Concurrent:

1.BlockingQueue(阻塞队列)

ArrayBlockingQueue(指定容量,不可变),LinkeBlocingQueue(指定容量不可变,也可以不指定容量,默认Integer.Max_value)

PriorityBlockingQueue(根据实现的接口自定义排序,只有在逐个拿取的时候才有序)

SynchronousQueue(长度以1只能为1)

2.ConcurrentMap

ConcurrentHashMap

1.5 分桶加锁,1.8 CAS+红黑树来保证线程安全

ConcurrentNavigableMap(针对有序列表,有map.headMap map.subMap map.tailMap , 返回有序的在取出范围的map)

3.CountDownLatch闭锁

CountDownLatch 以一个给定的数量初始化。 countDown() 每被调用一次,这一数 量就减一。通过调用 await() 方法之一,线程可以阻塞等待这一数量到达零。

等待一定数量的线程完成。来执行其后的程序

4.CyclicBarrier栅栏

它能够对处理一些算法的线程实现同
步。换句话讲,它就是一个所有线程必须等待的一个栅栏,直到所有线程都到达这 里,然后所有线程才可以继续做其他事情

5.Exchanger 交换机

类表示一种两个线程可以进行互相交换对象的会和点,只能两个线程之间交换数据

6.Semaphore信号量

l acquire()
l release()
计数信号量由一个指定数量的 "许可" 初始化。每调用一次 acquire(),一个许可会 被调用线程取走。每调用一次 release(),一个许可会被返还给信号量。因此,在没 有任何 release() 调用时,最多有 N 个线程能够通过 acquire() 方法, N 是该信 号量初始化时的许可的指定数量。

线程池

7.ExecutorService

(executors.newFixedThreadPool,长任务场景,只有核心线程,没有临时线程,容纳无限多

,newCachedThreadPool(高并发短任务场景,没有核心线程,全部都是临时线程,处理任意多的线程)

,newSingleThreadPool

,newSchedulerThreadPool(有核心线程,有临时线程)

)

提交线程的方法

execute() 提交线程 没有返回值submit(Runnable) 提交线程,返Future 可以通过Future.get()得到该线程的状态但是如果该线程未执行完成,那么该方法阻塞

submit(Callable) 同上面类似,但是线程可以带有返回值

invokeAny(.....),随机选择线程执行一个

invokeAll(),自动执行所有的线程

关闭线程池:ExecutorService.shutdown();该方法不再接受线程池,等待所有线程执行完毕后,线程池结束。

ExecutorService.shutdown();立即关闭,退出任务,正在执行的线程可能会出错。

##Callable只能用线程池提交

Callable runnable :

1.返回值

2.异常,runnabel没有容错机制,callable有容错机制,可以将 异常抛给上层处理

3.Callable只能通过submit方法提交,runnable可以new 也可 以通过线程池提交

8.ReadWriteLock 读写锁,可以是公平,也可以是非公平的。该锁可以跨方法

读锁可以共享,写锁互斥

读锁 readLock().lock();

写锁 writeLock().lock();




packagehgs.test;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.BlockingQueue;importjava.util.concurrent.BrokenBarrierException;importjava.util.concurrent.ConcurrentNavigableMap;importjava.util.concurrent.ConcurrentSkipListMap;importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.CyclicBarrier;importjava.util.concurrent.Exchanger;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;importjava.util.concurrent.PriorityBlockingQueue;importjava.util.concurrent.RejectedExecutionHandler;importjava.util.concurrent.Semaphore;importjava.util.concurrent.ThreadPoolExecutor;importjava.util.concurrent.TimeUnit;importjava.util.concurrent.locks.ReentrantReadWriteLock;publicclassTest{publicstaticvoidmain(String[]args)throwsInterruptedException,BrokenBarrierException{//ConcurrentMap//BlockingQueue<String>bq=newArrayBlockingQueue<String>(4);//add(超过长度会报错)remove(没有元素会报错)/*bq.add("1");bq.add("1");bq.add("1");bq.add("1");bq.remove();bq.remove();bq.remove();bq.remove();*///offer(如果可以插入返回true否则false)poll(如果没有元素返回null)//bq.offer("1");//bq.offer("1");//bq.offer("1");//bq.offer("1");//bq.offer("1");//Stringflag=bq.poll();//System.out.println(flag);//puttake阻塞式//bq.put("1");//bq.put("1");//bq.put("1");//bq.put("1");//bq.put("1");//bq.put("1");//bq.take();//offer(o,timeout,timeunit),poll(timeout,tameunit)等待timeout时间,然后跳过//bq.poll(10,TimeUnit.SECONDS);//bq.element();//检查是否为空,是的话跑出异常//bq.peek();//阻塞/*ConcurrentNavigableMap<Integer,String>m=newConcurrentSkipListMap<Integer,String>();m.put(1,"1");m.put(2,"2");m.put(5,"5");m.put(4,"4");m.put(6,"6");m.put(5,"5");System.out.println("head(\"5\")"+m.headMap(4));System.out.println();System.out.println();*//*Boyb1=newBoy("b1",24);Boyb2=newBoy("b2",23);Boyb3=newBoy("b3",26);Boyb4=newBoy("b4",19);PriorityBlockingQueue<Boy>pbq=newPriorityBlockingQueue<Boy>(100);pbq.add(b1);pbq.add(b2);pbq.add(b3);pbq.add(b4);for(inti=0;i<4;i++){System.out.println(pbq.take().toString());}*///闭锁/*CountDownLatchcdl=newCountDownLatch(4);for(inti=0;i<4;i++){newThread(newBoyRun(cdl)).start();}cdl.await();System.out.println("全部到达。。。");*///栅栏/*CyclicBarriercb=newCyclicBarrier(4);for(inti=0;i<4;i++){newThread(newGirlRan(cb)).start();}cb.await();System.out.println("allcomming.");*///exchanger交换器/*Exchanger<String>ex=newExchanger<String>();ExchangerTeste1=newExchangerTest(ex);ExchangerTeste2=newExchangerTest(ex);newThread(e1).start();newThread(e2).start();*///6.Semaphore信号量/*Semaphores=newSemaphore(5);for(inti=0;i<9;i++){newSemaphoreTest(s).start();}*///原始创建线程池,executors.newCacheThreadPool的底层调用该方法/*ExecutorServicees=newThreadPoolExecutor(5,10,10,TimeUnit.SECONDS,newArrayBlockingQueue<>(10),newRejectedExecutionHandler(){@OverridepublicvoidrejectedExecution(Runnabler,ThreadPoolExecutorexecutor){System.out.println("full......");}});for(inti=0;i<24;i++){es.execute(newThreadPoolTest());}es.shutdown();*///可重入锁ReentrantLock可重入读写锁ReentrantReadWriteLock/*ReentrantReadWriteLockrwLock=newReentrantReadWriteLock();for(inti=0;i<3;i++){newReadLockTest(rwLock).start();}*/ReentrantReadWriteLockrwLock=newReentrantReadWriteLock();for(inti=0;i<3;i++){newWriteLockTest(rwLock).start();}}}classWriteLockTestextendsThread{ReentrantReadWriteLockrwLock;publicWriteLockTest(ReentrantReadWriteLockrwLock){this.rwLock=rwLock;}@Overridepublicvoidrun(){rwLock.writeLock().lock();System.out.println("reading......");try{Thread.sleep((long)(Math.random()*2000));}catch(InterruptedExceptione){//TODOAuto-generatedcatchblocke.printStackTrace();}System.out.println("done......");rwLock.writeLock().unlock();}}classReadLockTestextendsThread{ReentrantReadWriteLockrwLock;publicReadLockTest(ReentrantReadWriteLockrwLock){this.rwLock=rwLock;}@Overridepublicvoidrun(){rwLock.readLock().lock();System.out.println("reading......");try{Thread.sleep((long)(Math.random()*2000));}catch(InterruptedExceptione){//TODOAuto-generatedcatchblocke.printStackTrace();}System.out.println("done......");rwLock.readLock().unlock();}}classThreadPoolTestimplementsRunnable{ThreadPoolTest(){}@Overridepublicvoidrun(){System.out.println(Thread.currentThread().getName());try{Thread.sleep((long)(Math.random()*2000));}catch(InterruptedExceptione){//TODOAuto-generatedcatchblocke.printStackTrace();}}}classSemaphoreTestextendsThread{Semaphores=null;publicSemaphoreTest(Semaphores){this.s=s;}@Overridepublicvoidrun(){try{s.acquire();System.out.println("aquire......");Thread.sleep((long)(Math.random()*3000));}catch(InterruptedExceptione){//TODOAuto-generatedcatchblocke.printStackTrace();}System.out.println("release......");s.release();}}classExchangerTestimplementsRunnable{Exchanger<String>ex=null;publicExchangerTest(Exchanger<String>ex){this.ex=ex;}@Overridepublicvoidrun(){Stringmy=Thread.currentThread().getName();Stringexstr=null;try{exstr=ex.exchange(my);}catch(InterruptedExceptione){//TODOAuto-generatedcatchblocke.printStackTrace();}System.out.println(my+""+exstr);}}classBoyRunimplementsRunnable{CountDownLatchcdl;publicBoyRun(CountDownLatchcdl){this.cdl=cdl;}@Overridepublicvoidrun(){System.out.println("cdl"+"+1");cdl.countDown();System.out.println("cdl"+"+1--");}}classGirlRanimplementsRunnable{CyclicBarriercb;publicGirlRan(CyclicBarriercb){this.cb=cb;}@Overridepublicvoidrun(){System.out.println("cdl"+"+1");try{Thread.sleep((long)(Math.random()*5000));cb.await();}catch(InterruptedException|BrokenBarrierExceptione){//TODOAuto-generatedcatchblocke.printStackTrace();}System.out.println("cdl"+"+1--");}}classBoyimplementsComparable<Boy>{Stringname;intage;publicBoy(Stringname,intage){this.name=name;this.age=age;}publicStringgetName(){returnname;}publicvoidsetName(Stringname){this.name=name;}publicintgetAge(){returnage;}publicvoidsetAge(intage){this.age=age;}publicintcompareTo(Boyo){returnthis.age-o.age;}@OverridepublicStringtoString(){return"Boy[name="+name+",age="+age+"]";}}