Java并发编程问题汇总三:AQS
上一篇,我们了解了Java自带synchronized锁的一些相关知识,本篇文章将会聚焦在concurrent中,所有锁类型都会用到的工具类:AQS(AbstractQueuedSynchronizer),看看concurrent包中的ReentrantLock、CountDownLatch等工具都是如何实现的。
引入:ReentrantLock、NonretrantLock、ReentrantReadWriteLock
公平锁与非公平锁
为了方面后面介绍AQS,我们先看一下ReentrantLock是如何使用AQS来完成自己功能的。先了解AQS的使用,再了解AQS的原理,往往学习效率更高。
ReentrantLock是concurrent包里提供的可重入锁。它提供了公平模式和非公平模式:
- 公平模式:根据申请顺序,线程会进入一个队列。队首的线程出队(释放锁)之后,下一个线程会获取到锁。这种调度方式的优点是,不会出现饥饿现象。缺点是吞吐低。
- 非公平模式:线程获取锁的时候,会直接尝试获取锁(插队),获取不到才需要去排队。
用户可以在构造函数中,指定是否使用公平模式:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
通过上面可知,公平模式和非公平模式的不同点在与一个叫sync的东西,那这个sync是个什么东西呢?通过下面的代码可以看出来,Sync就是一个抽象类,他继承了AQS,在AQS提供的基础设施的基础上,完成了ReentrantLock另外的基础设施。
abstract static class Sync extends AbstractQueuedSynchronizer {
// ...省略
}
Sync类的内容后面分析,目前的当务之急是分析一下FiirSync和NonfairSync的区别,循序渐进。
通过上面的对比图可知,公平模式比非公平模式多了一个判断语句:hasQueuedPredecessors()
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
通过方法名和实现可以看出来,这是判断thread等待队列是否有后继节点。这也与公平锁的定义一致。
可重入锁与不可重入锁
之前有提到过可重入锁,但是没有说过它具体是什么含义。
可重入锁指的是,锁以线程为单位,同一个线程内,多次访问同一个加锁块,是不会被阻塞的。而不可重入锁,是以调用为单位,无论是不是在同一个线程,只要有另一个调用在访问这个加锁的块,当前调用就需要阻塞。
JDK中,只实现了可重入锁,没有实现不可重入锁,应该是因为不可重入锁适用范围窄,而且容易死锁,难以使用。
但是netty中实现了不可重入锁,不可重入锁的实现更简单,只是简单的继承了AQS,然后做了一点工作。我们先来对比一下这两者的差异: 通过上图可以看出来,两者的差距在与一个叫state的东西,在可重入锁中,state是一个计数器,当前同一个线程访问这个代码片段时,state+1。而在不可重入锁中,state只是一个标志位。
独占锁与共享锁
JDK中提供了ReentrantReadWriteLock,也叫读写锁。
读写锁存在的意义是,如果一个系统,写请求少,读请求多,那么如果每次读都加锁的话,严重影响性能了。因为写请求少,本来只需要对写写、写读进行隔离就行,读读是不受影响的。所以出现了读写锁。
ReentrantReadWriteLock的构造函数如下:
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
可以看到,内部有两个锁,一个是读锁,一个是写锁。而这两个锁,共用了同一个sync,共用sync,是因为读和写是排斥的,需要了解对方的信息。
那到底是如何使用同一个sync来实现读写分离的呢?关键还在与state这个字段,前面在将可重入、不可重入的时候已经说过了,在可重入锁中,state是一个计数器,不可重入锁中,state是一个标志位。在读写锁中,state的作用又变了,跟之前完全是两码事了!
state是一个int类型,32位。在读写锁中,高16位表示读状态(计数器),低16位表示写状态。将到这里,有些读者应该有所感觉了。
在ReentrantReadWriteLock中,实现了tryAcquire和tryAcquireShared两个方法,分别是用在写锁和读锁的获取上。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 && // 获取独占锁的个数,如果独占锁的个数是0,或者是本线程(可重入)在操作,那就可以继续获取锁了。
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) { // 如果没有其他读写,则继续获取锁
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())// 如果有其他读操作或者有其他线程在操作,则无法获取锁。
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
AQS
经过分析ReentrantLock等的实现,我们已经对AQS是什么有了一个隐约的轮廓:
- AQS里面有一个队列,用来存排队的线程,可以在这个队列中胜出的线程,就可以获取到锁。
- AQS里面有一个state字段,这个字段在可重入锁中充当的是计数器角色,表示当前线程重复获取了多少次锁。在不可重入锁中充当的是标志位,表示是否有其他操作获取过锁。在读写锁中,高16位是读计数器,低16位是写计数器。
- AQS里面有一个exclusiveOwnerThread字段,表示当前是哪个线程再获取独占锁。
除此之外,这里还要总结一下:
- 队列的入队和出队都用了CAS工具unsafe.compareAndSwapObject,也就是自选锁。
- state的设置也是使用CAS实现的。
- 线程的阻塞和释放使用了LockSupport.park和LockSupport.unpark
Condition
除了有一个同步队列之外,AQS里面还有一个ConditionObject类,这个是配合Condition使用的。ConditionObject里面也有一个队列。Condition用来两个线程之间通讯用的工具,具体使用方式如下:
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
...
lock.lock();
try {
while ( 某条件A ) {
condition.await();
}
// 条件A满足时应执行的代码
} finally {
lock.unlock();
}
ConditionObject实现了java.util.concurrent.locks.Condition接口,表示自己是一个条件队列。作为内部类,它没有被static关键字修饰,表示它不能脱离外部的AQS类独立存在,必须与外部类AQS实例建立关联。这与我们实际使用可重入锁也是一致的。Condition属于某个锁。
Condition有两个核心方法:await(), signal(),这里提一嘴在Java原生体系中ReentrantLock对应的是synchronized,而Condition的await、signal对应的是Object的wait、notify。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
这里分析一下await步骤:
- 相应中断
- 将当前线程放入条件队列(ConditionObject维持的一个队列,与AQS中的同步队列不是一个)
- 释放当前线程
- 然后睡觉-被唤醒-睡觉-被唤醒-睡觉-被唤醒如此循环,直到发现当前线程已经处于同步队列了,说明自己已经获取到锁了。
- 后面就是清理所状态
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
下面分析signal步骤:
- 从条件队列头部开始,找到第一个未取消等待的线程,并把线程由条件等待状态改为就绪状态
- 条件队列取下来放入锁队列
- 把这个线程用CAS操作设为等待锁状态
CountDownLatch
CountDownLatch的代码比较简单,与其他动辄三四千行的大类来说,300多行代码可谓是小巧玲珑了,所以在大体了解了AQS的原理之后,了解CountDownLatch的原理也比较简单。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
通过上面的代码可知CountDownLatch中,state也是一个倒计数的工具。
CountDownLatch有两个核心方法:await、countDown
在调用await之后:
- sync的acquireSharedInterruptibly会被调用
- 在acquireSharedInterruptibly中,把当前线程加入同步队列。
- 进入循环:判断是否state已经被减到0,如果被减到0,则返回,否则睡觉
- 这里是被谁唤醒的呢?
是被countdown唤醒的。具体不再分析了,可以参考后面的参考文章。
其他
至此,我们学习了Java中,如何使用AQS实现可重入锁、不可重入锁、公平锁、非公平锁、读写锁、CountDownLatch。
除此之外,AQS还是实现线程池Worker、Semaphore等的核心工具。关于线程池,后面会单独写一篇文章。