AQS源码完全解析

本文基于JDK1.8的AbstractQueuedSynchronizer源码进行原理解析,欢迎讨论和错误指正

理解并发编程

独立执行,进度随机

每个线程执行相同的代码,但是各自独立执行、进度(线程私有的当前执行代码行号PC)不同。由于并行线程执行速度的不确定,并发线程切换时机的不确定,导致任一时刻每个线程的执行进度是不可准确预测的,这就意味着 线程T执行到某一行时,其他线程T’可能执行到整个代码逻辑的任意一个位置,站在线程T视角可以看到共享变量被T’修改的状态,这是理解AQS代码中一些费解的条件语句的核心,此时最好的办法是模拟另一个线程执行全部流程复现状态

私有数据安全,不可变数据安全,共享变量不安全

每个线程执行方法时拥有自己的数据(局部变量保存在线程私有的虚拟机栈帧中),更改数据时更改的是自己持有的变量,不涉及并发安全问题

常量不涉及更改,不涉及并发安全问题

共享数据类成员(存储在堆区)、静态变量(存储在方法区),

线程对一个变量的修改背后可能是多条机器指令,而

锁全局生效

锁相当于一个全局变量,是执行加锁和解锁之间代码块(本文简称为锁区)的准入条件:即使获得锁的线程被切换,新获执行权的线程始终在尝试获取锁,滞留在加锁阶段而无法进入执行锁区代码

AQS实现原理

整体流程

线程先自己tryAcquire()竞争一次锁,竞争成功则直接进入锁区,竞争失败就acquireQueued()加入AQS队列并自旋两次tryAcquire()再次竞争锁,若自旋时获得锁则出队列进入锁区,自旋时仍未竞争到锁则被LockSupport.park()挂起:等待排到队首且锁被释放时被唤醒。线程释放锁时唤醒队列中的等待线程出队参与重新竞争

数据结构

AbstractQueuedSynchronizer抽象类维护了一个双向队列,前驱节点(prev、pred)保存后继结点(next)的等待状态,队列节点中保存获取锁失败的线程

设计模式

采用模板方法模式定义模板方法来规定线程竞争锁失败时入队列acquireQueued()、线程释放锁时唤醒后继线程unparkSuccessor()、线程竞争锁成功时出队列acquireQueued()的流程,而线程对锁的获取和释放逻辑由其子类来实现

子类实现

AbstractQueuedSynchronizer中规定线程竞争的锁实际是volatile变量state,获取锁和释放锁实际是对state值的CAS操作。子类覆盖获取锁方法tryAcquire()、释放锁方法tryRelease()来自定义对state的操作,因而获得不同行为特征的锁:如ReetrantLock定义state=0为锁空闲,state>0为锁占有,获取锁成功state++,释放锁成功state–;因允许获取锁的线程再次获取锁(state>1)因而被称为可重入锁

Contition支持

类设计图

实现细节逐行解析

类构造器

类构造器委派锁实现类,无参默认非公平锁,有参按参数指定锁

public ReentrantLock() {
    sync = new NonfairSync();
}
 
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

lock()方法实现

ReentrantLock实现了Lock接口中的lock()方法,委派给类构造器传入的实例sync.lock()实现方法

@Override
public void lock() {
    sync.lock();
}

内部类Sync的lock()抽象方法由子类FairSync和NonfairSync实现

abstract static class Sync extends AbstractQueuedSynchronizer {
    abstract void lock();
}

NonfairSync.lock()

非公平锁实现:
state=0锁可用,state=1锁占用
CAS锁竞争成功则设置,设置锁的持有者为当前线程
若获取锁失败,使用AQS获取锁

static final class NonfairSync extends Sync {
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
}

AQS的模板方法aquire(),规定获取锁的规则

public abstract class AbstractQueuedSynchronizer {
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&                             // 子类实现获取锁的规则
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  // 获取锁失败进入队列等待
            selfInterrupt();                                // 
    }
}

AQS模板方法的自定义实现:NonfarSync子类实现获取锁方法tryAcquire(),委托给Sync实现方法nonfairTryAcquire()

static final class NonfairSync extends Sync {
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

非公平获取锁,线程无需排队重新竞争锁,并返回竞争结果

abstract static class Sync extends AbstractQueuedSynchronizer {
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {                                           // 锁暂无持有人时CAS竞争锁,获取锁成功后设置持有线程
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {        // 锁被持有时看看是不是线程自己持有,若是则线程安全地增加锁被持有次数state
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;                                           // 既未竞争成功也不是重复获取锁,获取锁失败
    }
}

AQS队列添加节点操作,无其他线程操作队尾则CAS插入队尾,否则循环CAS直到插入队尾成功

public abstract class AbstractQueuedSynchronizer {
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode); // 将当前线程包装入Node节点
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;                               // oldTail <- node
            if (compareAndSetTail(pred, node)) {            // tail=pred时CAS设置tail=node
                pred.next = node;                           // oldTail -> node
                return node;                                // CAS插入队尾成功
            }
        }
        enq(node);                                          // 插入队尾失败则循环CAS
        return node;
    }
}

将获取锁失败的节点循环CAS插入队尾,头节点是空节点,队中第一个节点是head.next

public abstract class AbstractQueuedSynchronizer {
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) {                        // 队中无节点,初始化一个头节点
                if (compareAndSetHead(new Node()))  // head=null时CAS设置head=new Node()头节点
                    tail = head;
            } else {                                // 队中有节点,循环CAS直至插入队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
}

tryAquire()失败的线程包装一个节点加入队列,并不断循环看自己是不是队首,如果是队首自旋抢锁,非队首线程直接挂起

public abstract class AbstractQueuedSynchronizer {
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {             // node.prev=head时,node就是队首,之前无线程等待,队首重新去竞争锁,即使切换线程,其他节点一直在外面绕进不来
                    setHead(node);                              // 队首竞争锁成功,仅有当前线程进入此代码块,设置node为新头节点,清空node中保存的线程,等价于线程出队列
                    p.next = null;                              // p.next已经可以回收,不必等到回收p时再回收
                    failed = false;                             // 设置竞争锁成功标志
                    return interrupted;                         // finally执行完后返回线程打断结果,由外部处理
                }
                if (shouldParkAfterFailedAcquire(p, node) &&    // node不是队首节点或者队首线程获取锁失败
                    parkAndCheckInterrupt())                    // 挂起当前线程,并判断当前线程是否被打断
                    interrupted = true;                         // 当前线程被打断仅记录结果
            }
        } finally {
            if (failed)                                         // 线程被打断?fail未改变
                cancelAcquire(node);                            // 取消获取的锁
        }
    }
}
public abstract class AbstractQueuedSynchronizer {
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;
 
        node.thread = null;
 
        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
 
        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;
 
        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;
 
        if (node == tail && compareAndSetTail(node, pred)) {    // node为末尾节点时,tail设为node.prev
            compareAndSetNext(pred, predNext, null);            // pred.next设为null
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }
 
            node.next = node; // help GC
        }
    }
}

如果当前线程获取锁失败,且node节点在队列中时持有线程被打断,最终打断该线程

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

之前还有节点pred未出队,node还在队中不时队首,

public abstract class AbstractQueuedSynchronizer {
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)  // ?
            return true;
        if (ws > 0) {           // waitStatus=CANCELED,依次删除前一个被取消的节点
            do {
                node.prev = pred = pred.prev;   // pred.prev <--[pred]-- node
            } while (pred.waitStatus > 0);
            pred.next = node;                   // pred.prev <==> node
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // pred.waitStatus=ws未被改变则CAS设置pres.waitStatus=Node.SIGNAL
        }
        return false;
    }
}

waitStatus的四种取值

public abstract class AbstractQueuedSynchronizer {
    static final class Node {
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
    }
}

挂起当前线程,在调用release时waitStatus被设置为0,unpark()唤醒后重新参与到acquireQueued()中的循环CAS和其他线程一起竞争锁,若竞争失败则重新加入队列休眠

public abstract class AbstractQueuedSynchronizer {
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);         // 线程阻塞
        return Thread.interrupted();    // 唤醒时返回休眠状态
    }
}

FairSync.lock()

static final class FairSync extends Sync {
    final void lock() {
        acquire(1);
    }
}

unlock()方法实现

public class ReentrantLock{
    public void unlock() {
        sync.release(1);
    }
}
public abstract class AbstractQueuedSynchronizer {
    public final boolean release(int arg) {
        if (tryRelease(arg)) {                      // 当前线程先释放持有锁
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);                 // 锁空闲时唤醒AQS中的队首(head.next)
            return true;                            // 当前锁已空闲
        }
        return false;                               // 锁仍然被当前线程持有
    }
}
public abstract class ReentrantLock {
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;                              // 释放锁时减少持有数量
        if (Thread.currentThread() != getExclusiveOwnerThread())    // 当前线程只能释放当前线程持有的锁,实际上其他线程还滞留在lock.lock()方法中
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {                                               // 锁空闲时重置相关变量
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);                                                // 拥有锁的线程才可执行此部分,线程安全地更新持有数量
        return free;                                                // 当前线程可能在释放一定数量后state≠0仍然持有锁
    }
}
public abstract class AbstractQueuedSynchronizer {
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;                   // node节点的ws保存的是node.next的状态
        if (ws < 0)                                 // ws=-1阻塞,设为默认状态ws=0唤醒
            compareAndSetWaitStatus(node, ws, 0);
 
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {        // s.ws>0说明node.next.next是cancel节点
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev) // 依次从后向前遍历,将s定位到cancel节点
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)                              // 唤醒node.next阻塞线程,返回到parkAndCheckInterrupt()
            LockSupport.unpark(s.thread);
    }
}

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

©2018-2024 Howell版权所有 备案号:冀ICP备19000576号