本文基于JDK1.8的AbstractQueuedSynchronizer源码进行原理解析,欢迎讨论和错误指正
理解并发编程
独立执行,进度随机
每个线程执行相同的代码,但是各自独立执行、进度(线程私有的当前执行代码行号PC)不同。由于并行线程执行速度的不确定,并发线程切换时机的不确定,导致任一时刻每个线程的执行进度是不可准确预测的,这就意味着 线程T执行到某一行时,其他线程T’可能执行到整个代码逻辑的任意一个位置,站在线程T视角可以看到共享变量被T’修改的状态,这是理解AQS代码中一些费解的条件语句的核心,此时最好的办法是模拟另一个线程执行全部流程复现状态
私有数据安全,不可变数据安全,共享变量不安全
每个线程执行方法时拥有自己的数据(局部变量保存在线程私有的虚拟机栈帧中),更改数据时更改的是自己持有的变量,不涉及并发安全问题
常量不涉及更改,不涉及并发安全问题
共享数据类成员(存储在堆区)、静态变量(存储在方法区),
线程对一个变量的修改背后可能是多条机器指令,而
锁全局生效
锁相当于一个全局变量,是执行加锁和解锁之间代码块(本文简称为锁区)的准入条件:即使获得锁的线程被切换,新获执行权的线程始终在尝试获取锁,滞留在加锁阶段而无法进入执行锁区代码
AQS实现原理
整体流程
线程先自己tryAcquire()竞争一次锁,竞争成功则直接进入锁区,竞争失败就acquireQueued()加入AQS队列并自旋两次tryAcquire()再次竞争锁,若自旋时获得锁则出队列进入锁区,自旋时仍未竞争到锁则被LockSupport.park()挂起:等待排到队首且锁被释放时被唤醒。线程释放锁时唤醒队列中的等待线程出队参与重新竞争
数据结构
AbstractQueuedSynchronizer抽象类维护了一个双向队列,前驱节点(prev、pred)保存后继结点(next)的等待状态,队列节点中保存获取锁失败的线程
设计模式
采用模板方法模式定义模板方法来规定线程竞争锁失败时入队列acquireQueued()、线程释放锁时唤醒后继线程unparkSuccessor()、线程竞争锁成功时出队列acquireQueued()的流程,而线程对锁的获取和释放逻辑由其子类来实现
子类实现
AbstractQueuedSynchronizer中规定线程竞争的锁实际是volatile变量state,获取锁和释放锁实际是对state值的CAS操作。子类覆盖获取锁方法tryAcquire()、释放锁方法tryRelease()来自定义对state的操作,因而获得不同行为特征的锁:如ReetrantLock定义state=0为锁空闲,state>0为锁占有,获取锁成功state++,释放锁成功state–;因允许获取锁的线程再次获取锁(state>1)因而被称为可重入锁
Contition支持
类设计图
实现细节逐行解析
类构造器
类构造器委派锁实现类,无参默认非公平锁,有参按参数指定锁
public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
lock()方法实现
ReentrantLock实现了Lock接口中的lock()方法,委派给类构造器传入的实例sync.lock()实现方法
@Override public void lock() { sync.lock(); }
内部类Sync的lock()抽象方法由子类FairSync和NonfairSync实现
abstract static class Sync extends AbstractQueuedSynchronizer { abstract void lock(); }
NonfairSync.lock()
非公平锁实现:
state=0锁可用,state=1锁占用
CAS锁竞争成功则设置,设置锁的持有者为当前线程
若获取锁失败,使用AQS获取锁
static final class NonfairSync extends Sync { final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } }
AQS的模板方法aquire(),规定获取锁的规则
public abstract class AbstractQueuedSynchronizer { public final void acquire(int arg) { if (!tryAcquire(arg) && // 子类实现获取锁的规则 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 获取锁失败进入队列等待 selfInterrupt(); // } }
AQS模板方法的自定义实现:NonfarSync子类实现获取锁方法tryAcquire(),委托给Sync实现方法nonfairTryAcquire()
static final class NonfairSync extends Sync { protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
非公平获取锁,线程无需排队重新竞争锁,并返回竞争结果
abstract static class Sync extends AbstractQueuedSynchronizer { final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 锁暂无持有人时CAS竞争锁,获取锁成功后设置持有线程 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 锁被持有时看看是不是线程自己持有,若是则线程安全地增加锁被持有次数state int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; // 既未竞争成功也不是重复获取锁,获取锁失败 } }
AQS队列添加节点操作,无其他线程操作队尾则CAS插入队尾,否则循环CAS直到插入队尾成功
public abstract class AbstractQueuedSynchronizer { private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 将当前线程包装入Node节点 Node pred = tail; if (pred != null) { node.prev = pred; // oldTail <- node if (compareAndSetTail(pred, node)) { // tail=pred时CAS设置tail=node pred.next = node; // oldTail -> node return node; // CAS插入队尾成功 } } enq(node); // 插入队尾失败则循环CAS return node; } }
将获取锁失败的节点循环CAS插入队尾,头节点是空节点,队中第一个节点是head.next
public abstract class AbstractQueuedSynchronizer { private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // 队中无节点,初始化一个头节点 if (compareAndSetHead(new Node())) // head=null时CAS设置head=new Node()头节点 tail = head; } else { // 队中有节点,循环CAS直至插入队尾 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } }
tryAquire()失败的线程包装一个节点加入队列,并不断循环看自己是不是队首,如果是队首自旋抢锁,非队首线程直接挂起
public abstract class AbstractQueuedSynchronizer { final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // node.prev=head时,node就是队首,之前无线程等待,队首重新去竞争锁,即使切换线程,其他节点一直在外面绕进不来 setHead(node); // 队首竞争锁成功,仅有当前线程进入此代码块,设置node为新头节点,清空node中保存的线程,等价于线程出队列 p.next = null; // p.next已经可以回收,不必等到回收p时再回收 failed = false; // 设置竞争锁成功标志 return interrupted; // finally执行完后返回线程打断结果,由外部处理 } if (shouldParkAfterFailedAcquire(p, node) && // node不是队首节点或者队首线程获取锁失败 parkAndCheckInterrupt()) // 挂起当前线程,并判断当前线程是否被打断 interrupted = true; // 当前线程被打断仅记录结果 } } finally { if (failed) // 线程被打断?fail未改变 cancelAcquire(node); // 取消获取的锁 } } }
public abstract class AbstractQueuedSynchronizer { private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; if (node == tail && compareAndSetTail(node, pred)) { // node为末尾节点时,tail设为node.prev compareAndSetNext(pred, predNext, null); // pred.next设为null } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } } }
如果当前线程获取锁失败,且node节点在队列中时持有线程被打断,最终打断该线程
static void selfInterrupt() { Thread.currentThread().interrupt(); }
之前还有节点pred未出队,node还在队中不时队首,
public abstract class AbstractQueuedSynchronizer { private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // ? return true; if (ws > 0) { // waitStatus=CANCELED,依次删除前一个被取消的节点 do { node.prev = pred = pred.prev; // pred.prev <--[pred]-- node } while (pred.waitStatus > 0); pred.next = node; // pred.prev <==> node } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // pred.waitStatus=ws未被改变则CAS设置pres.waitStatus=Node.SIGNAL } return false; } }
waitStatus的四种取值
public abstract class AbstractQueuedSynchronizer { static final class Node { static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; } }
挂起当前线程,在调用release时waitStatus被设置为0,unpark()唤醒后重新参与到acquireQueued()中的循环CAS和其他线程一起竞争锁,若竞争失败则重新加入队列休眠
public abstract class AbstractQueuedSynchronizer { private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 线程阻塞 return Thread.interrupted(); // 唤醒时返回休眠状态 } }
FairSync.lock()
static final class FairSync extends Sync { final void lock() { acquire(1); } }
unlock()方法实现
public class ReentrantLock{ public void unlock() { sync.release(1); } }
public abstract class AbstractQueuedSynchronizer { public final boolean release(int arg) { if (tryRelease(arg)) { // 当前线程先释放持有锁 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // 锁空闲时唤醒AQS中的队首(head.next) return true; // 当前锁已空闲 } return false; // 锁仍然被当前线程持有 } }
public abstract class ReentrantLock { protected final boolean tryRelease(int releases) { int c = getState() - releases; // 释放锁时减少持有数量 if (Thread.currentThread() != getExclusiveOwnerThread()) // 当前线程只能释放当前线程持有的锁,实际上其他线程还滞留在lock.lock()方法中 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 锁空闲时重置相关变量 free = true; setExclusiveOwnerThread(null); } setState(c); // 拥有锁的线程才可执行此部分,线程安全地更新持有数量 return free; // 当前线程可能在释放一定数量后state≠0仍然持有锁 } }
public abstract class AbstractQueuedSynchronizer { private void unparkSuccessor(Node node) { int ws = node.waitStatus; // node节点的ws保存的是node.next的状态 if (ws < 0) // ws=-1阻塞,设为默认状态ws=0唤醒 compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { // s.ws>0说明node.next.next是cancel节点 s = null; for (Node t = tail; t != null && t != node; t = t.prev) // 依次从后向前遍历,将s定位到cancel节点 if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒node.next阻塞线程,返回到parkAndCheckInterrupt() LockSupport.unpark(s.thread); } }