AQS-ReentrantLock实现原理
AbstractQueuedSynchronizer (AQS)类如其名,抽象的队列式同步容器,AQS定义类一套多线程访问共享资源的同步器,许多同步类的实现都依赖于它,比如之前学习的ReentrantLock/Semaphore/CountDownLatch。
1.AQSAQS阻塞队列.png
1。自定义同步器在实现时只需要实现共享资源state的获取于释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了,自定义同步容器实现时主要实现以下几种方法:
2.isHeldExclusively():该线程是否正在独占资源,只有用到condition才需要取实现它。
3.tryAcquire(int):独占方式,尝试获取资源,成功则返回true,失败则返回false。
4.tryRelease(int):独占方式,尝试释放资源,成功则返回true,失败则返回false。
5.tryAcquireShared(int):共享方式,尝试获取资源,附属表示获取失败,0表示获取成功,但是没有剩余资源可用,其他线程不能在获取,正数表示获取成功,并且有剩余资源,也就是自己可以重复获取(可重入)。
6.tryReleaseShare(int):共享方式。尝试释放资源,成功返回true,失败返回false。
7.以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()的时候,会调用tryAcquire()独占该锁并将state+1。此后其他线程在tryAcquire()独占该锁并将state+1。此后表示其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(几锁释放)为止,其他线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
2.1 Reentrantlock类结构
public class ReentrantLock implements Lock, java.io.Serializable
可以看出Reentrantlock 实现类lock接口,首先看下这个lock接口。
public interface Lock { //普通的获取锁的方法,lock会阻塞直到成功 void lock(); //与lock不同的时,它可以相应中断, void lockInterruptibly() throws InterruptedException; //尝试获取锁,立即返回,不阻塞,成功返回true,失败返回false boolean tryLock(); //先尝试获取锁,如果成功则返回true,失败则阻塞等待,等待时间由参数决定,在等待的同时相应中断,抛出中断异常,如果在等待的过程中获得类锁则返回true,否则直到时间超时,则返回false。boolean tryLock(long time, TimeUnit unit) throws InterruptedException;//普通释放锁的方法 void unlock();//新建一个条件,lock可以关联多个条件,关于条件在以后篇幅中记录 Condition newCondition();}
可以看出,显示锁,与synchronied相比,支持非阻塞的方式获取锁,可以响应中断,可以限时,这使得它非常的灵活。
现在回到Reentrantlock类,可以看到其内部基本上定义类三个内部类:
public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; ..... abstract static class Sync extends AbstractQueuedSynchronizer { ...}static final class NonfairSync extends Sync {....}....static final class FairSync extends Sync {.....}.....}
很显然从上面的类结构,我们可以得出jdk利用CAS算法和AQS实现类ReentrantLock。
2.2. 构造函数分析
/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); } /** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
ReentrantLock出于性能考虑,默认采用非公平锁。
2.3. 获取锁
2.3.1. 非公平模式下的加锁
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { //直接通过CAS算法,通过改变共享资源-state值,成功则设置当前线程为持有锁的线程(独占),失败则调用顶层AQS的acquire方法,再次尝试,失败则将线程放到阻塞队列 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } ... }
2.3.2. 公平模式下的加锁
/** * Sync object for fair locks */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; //获取锁 final void lock() { acquire(1); //在这里调用的时AQS顶层方法,获取失败则将线程放到阻塞队列末尾 } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //当前资源还没有被其他线程锁定 if (c == 0) { //没有其他线程正在竞争当前的临界资源,并且通过CAS加锁成功,则设置当前的线程为独占此资源的线程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果此时临界资源已经被锁定,则先判断持有者是否时当前线程 else if (current == getExclusiveOwnerThread()) { //如果是持有者,则可重复获取锁,同时更新状态值,注意加几次锁就要释放几次,直到为0 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
2.3.3、acquireInterruptibly解析
public void lockInterruptibly() throws InterruptedException { //调用AQS顶层方法 sync.acquireInterruptibly(1); } //AQS顶层方法 public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } //中断响应 private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 大致过程如下: (1)如果线程已经中断则直接抛出异常 (2)如果线程还没中断,则通过自旋方式,申请锁直至到当前线程获得锁,或失败,然后响应中断。
2.4 释放锁
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
其释放独占锁的过程大致如下:
(1)先获取当前资源状态,减去释放release,一般是1;
(2)判断当前线程是否锁定资源的线程,如果不是则抛出异常;
(3)假如资源状态值为0,则说明锁已经释放,将独占线程至空,更新状态值,返回设置成功或失败
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。