AQS源码分析
Table of Contents
概述
AbstractQueueSynchronized抽象队列同步器,Java的并发框架中核心实现。 思考一个问题:如果让你去实现并发控制,你会怎么做?再去对比别人是怎么做的。可以借鉴synchronized的实现。
核心原理
CLH (Craig, Landin and Hagersten) , 是一种基于链表的可扩展,高性能,公平的自旋锁。本质上就是维护了一个state变量和FIFO的线程队列。然后通过CAS的操作去获取state的值,获取到state值就可以执行相关原子操作。 然后封装了一些模版方法,比如说getState, setState, compareAndSet. 然后juc上很多框架,像ReentryLock,Semaphore, CountDownLatch.ReadWriteLock底层都是基于AQS实现的。
两种模式
独占(acquire)
实现子类:ReentrantLock的公平锁和非公平锁
共享(acuqireShared)
实现子类:Semaphore/CountDownLatch。Semaphore、CountDownLatch、 CyclicBarrier、ReadWriteLock
源码解析
核心方法
- accquire (获取独占锁)
- acquireShared(获取共享锁)
- release (先尝试释放锁,然后通知队列第一个节点)
- releaseShared (释放共享锁)
特性方法
- acquireInterruptibly 可中断的方式获取锁
- tryAcquireNanos 可超时获取锁
模版方法
- tryAcquire (需要子类实现具体的获取独占锁的逻辑)
- tryRealse (需要子类实现具体的释放独占锁的逻辑)
- tryAcquireShard(需要子类实现具体的获取共享锁的逻辑)
- tryRequireShard(需要子类实现具体的释放共享锁的逻辑)
acquire方法
acquire的整体逻辑还是比较清晰的
- 想尝试获取,能获取就直接成功了
- 获取失败了,就封装成一个waiter节点,加入到队列中
- 从队列中获取节点
整体流程
源码分析
public final void acquire(int arg) {
//先获取一把,如果失败了,就新增节点,添加到队列中,然后自我中断
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
//新增节点
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速入队 Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//通过标准的方式入队
enq(node);
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果是head节点,并且获取锁成功,就执行成功了,把自己从队列中移除
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//非head节点或者获取锁失败
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
release方法
整体流程
源码分析
@ReservedStackAccess
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//唤醒下一个节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) //节点处于被唤醒的状态
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
插曲
看源码的时候,不小心看到了java17的版本,发现和一些博客的说法不一样,我就很奇怪,原来是 Doug Lea在后续的版本中又对aqs做了优化,把共享,独占的一些获取逻辑合并到一起去了,看起来更复杂了。
acquire(java17的实现)
java17中,aqs的多种方式被合并成了一个核心实现,都在acquire方法中。 不管是用独占,共享,还是指定等待时间,最终我们都会走到accquire方法上来,因为这里是核心的处理逻辑,会根据是否共享模式,是否超时等方式去实现最终的逻辑。
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
for (;;) {
//不满足第一个的条件,循环等待
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
//如果前一个节点取消了,就清理队列,然后继续
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
//前一个节点要执行了,当前线程要进入准备状态了
Thread.onSpinWait(); // ensure serialization
continue;
}
}
//终于轮到我为第一个节点了
if (first || pred == null) {
boolean acquired;
try {
//如果是共享的模式,就通过共享模式获取
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
//否则就通过正常模式获取
acquired = tryAcquire(arg);
} catch (Throwable ex) {
//如果出现了异常,就取消获取
cancelAcquire(node, interrupted, false);
throw ex;
}
//如果获取到了,就出队列
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
//如果是共享模式,唤醒一下其他节点
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
//当前节点处理完了
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
//不为空,尝试入队
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
流程图
accquire的的实现逻辑考虑了两个模式,独占和共享模式,以及考虑是否需要超时中断。大致的处理逻辑如下
- 判断自己是否为第一个节点,如果是的话,直接去获取资源,如果获取失败,就取消获取。
- 如果自己不是第一个节点,看看自己是否在队列中,如果不在队列中,就添加到队列中。
- 如果以及在队列中,就看看上一个节点有没有超时,有超时就清理队列,否则就继续自旋
总结
今天的aqs分析就到此结束了。主要分析了核心的独占锁的方式,acquire方法和release方法。让我们对aqs的线程并发的管理有了更近一步的认识。
回顾一下
acquire
- 尝试获取一下
- 获取不到,把自己封装成node节点,先快速加入一把到队尾,失败了就cas加入
- 然后从队列中获取锁
- 如果自己是第一个节点,就获取一把,获取成功了就成功了。
- 如果上一个节点是否在执行了,如果是,就阻塞自己等待上一个执行完唤醒自己。
release
- 直接释放
- 释放成功了,更新自己的等待状态,唤醒下一个节点