LockSupport – 线程阻塞和唤醒机制细节

LockSupport接口提供了线程阻塞和唤醒方法,其中唤醒机制在官方文档中提到一些tricks,本文对其加以实例讨论验证。另对官方案例深入分析,做出一些思考讨论

阻塞指定线程 park

public static void park(Object blocker)
public static void parkNanos(Object blocker, long nanos)
public static void parkUntil(Object blocker, long deadline)

park()

将当前线程阻塞在park()代码行,三种情况会被唤醒继续执行:
1)其他线程调用unpark(currentThread)唤醒当前线程
2)当前线程被其他线程 打断,但是仅唤醒不抛InterruptedException。可在park()方法后运行Thread.interrupted()检测唤醒是否由于中断引起
3)没有任何理由自己唤醒(虚假唤醒)。为避免虚假唤醒,应将park()方法置于循环中

park(Object blocker)

将正在执行blocker对象代码的当前线程阻塞在park()代码行,被唤醒的三种情况同park(),线程dump时会显示阻塞对象blocker

parkNanos(long nanos)

将当前线程阻塞在parkNanos代码行处,若线程阻塞时长在(0,nanos)区间未被上述三种情况唤醒,则在阻塞时长达到nanos时自唤醒,避免永久等待

parkUntil(long deadline)

将当前线程阻塞在parkUntil代码行处,若线程在绝对时间deadline前未被上述三种情况唤醒,则在绝对时间deadline后自唤醒,避免永久等待

唤醒指定线程 unpark

public static void unpark(Thread thread)

唤醒阻塞在park()代码行的线程继续向下执行,有如下几点细节需注意

每个unpark()唤醒一个线程的park()

一般park()在先,unpark()在后,成对使用

public static void main(String[] args) {
    final Thread t1 = new Thread(() -> {
        long start, end;
        System.out.println("time before park : " + (start = System.currentTimeMillis()));
        LockSupport.park();
        System.out.println("time after park : " + (end = System.currentTimeMillis()));
        System.out.println("t1 is unpark by t2, time for waiting : " + (end - start));
    });
    t1.start();

    new Thread(() -> {
        try {
            Thread.sleep(5000);
            LockSupport.unpark(t1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }).start();
}
time before park : 1664120513742
time after park : 1664120518748
t1 is unpark by t2, time for waiting : 5006

多余的unpark()使活跃线程的下次park()失效

此处在线程启动之后、park()之前执行的unpark(),使得本次park()失效

public static void main(String[] args) {
    System.out.println(Thread.currentThread().getName() + System.currentTimeMillis());
    Thread t = new Thread(() -> {
        long start, end;
        System.out.println(Thread.currentThread().getName() + (start = System.currentTimeMillis()));
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        LockSupport.park();
        System.out.println(Thread.currentThread().getName() + (end = System.currentTimeMillis()));
        System.out.println("this park is unabled by last unpark, margin millis is " + (end - start));
    });
    t.start();
    LockSupport.unpark(t);
    System.out.println(Thread.currentThread().getName() + System.currentTimeMillis());
}
main1664115567502
Thread-01664115567532
main1664115567532
Thread-01664115572542
this park is unabled by last unpark, margin millis is 5010

park之前的多次unpark()等价于一次unpark()

如下例中park()之前的三次unpark()仅可成功消解一次park()

public static void main(String[] args) {
    Thread t = new Thread(() -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        LockSupport.park();
        System.out.println("unpark success 1");
        LockSupport.park();
        System.out.println("unpark success 2");
        LockSupport.park();
        System.out.println("unpark success 3");
    });
    t.start();
    LockSupport.unpark(t);
    LockSupport.unpark(t);
    LockSupport.unpark(t);
}
unpark success 1

线程启动之前的unpark()不对线程park()生效

public static void main(String[] args) {
    Thread t = new Thread(() -> {
        LockSupport.park();
        System.out.println("unpark before thread start works");
    });
    LockSupport.unpark(t);
    t.start();
}

线程被阻塞未输出任何结果


官方实例解析

FIFOMutex是公平锁:线程入队成为队列节点,只允许队头线程获取锁,因而保证了按排队顺序获取锁

代码详细分析

本锁是AQS中ReetrantLock公平锁的简化版实现,原理图类似,可参考相关文章

class FIFOMutex {
    // locked状态:false为锁空闲,true为锁占有
    private final AtomicBoolean locked = new AtomicBoolean(false);
    // 并发安全的队列
    private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();

    public void lock() {
        boolean wasInterrupted = false;
        Thread current = Thread.currentThread();
        // 当前线程进队列
        waiters.add(current);

        // 1. 当前线程不是队首,直接挂起
        // 2. 当前队首线程由于上个队首线程占有锁,CAS竞争锁失败挂起
        while (waiters.peek() != current ||
               !locked.compareAndSet(false, true)) {
            LockSupport.park(this);
            // 线程恢复运行后判断是否是由于被打断而唤醒
            if (Thread.interrupted())
                wasInterrupted = true;
        }

        // 当前线程是队首且CAS竞争锁成功出队列放行,执行完lock()方法后进入锁区
        waiters.remove();
        // 恢复线程打断处理
        if (wasInterrupted)
            current.interrupt();
    }

    public void unlock() {
        // 执行此方法的线程必是自旋竞争锁成功的线程,其他线程均在lock阶段park阻塞
        locked.set(false);
        // 唤醒队头线程
        LockSupport.unpark(waiters.peek());
    }
}

为什么要使用并发安全的队列

手写个简单队列,以入队操作为例说明并发不安全的后果

public class SimpleQueue {
    Node head = new Node(-1);
    Node tail = head;

    static class Node {
        Node next;
        int name;

        public Node(int name) {
            this.name = name;
            this.next = null;
        }

    }

    public void add(int e) {
        Node node = new Node(e);
        tail.next = node;
        tail = tail.next;
    }

    public void print() {
        Node cur = head;
        while (cur != null) {
            System.out.println(cur.name + " ");
            cur = cur.next;
        }
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        SimpleQueue q = new SimpleQueue();
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 100000; i += 2) {
                q.add(i);
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 1; i < 100000; i += 2) {
                q.add(i);
            }
        });
        t1.start(); t2.start();
        // main线程等待t1、t2执行完毕后再执行输出
        t2.join(); t1.join();
        q.print();
    }
}

javap -c(反编译class) -l(行号和本地变量表) -v(附加信息) 得到上述代码的编译字节码,过滤出add()相关内容

Constant pool:
    #2 = Class              #74         // copy/SimpleQueue$Node
    #3 = Methodref          #2.#75      // copy/SimpleQueue$Node."<init>":(I)V
    #5 = Fieldref           #16.#77     // copy/SimpleQueue.tail:Lcopy/SimpleQueue$Node;
    #6 = Fieldref           #2.#78      // copy/SimpleQueue$Node.next:Lcopy/SimpleQueue$Node;
 
  public void add(int);
    descriptor: (I)V
    flags: ACC_PUBLIC
    Code:
      stack=3, locals=3, args_size=2
         0: new           #2            // class copy/SimpleQueue$Node 创建SimpleQueue.Node实例
         3: dup                         // 复制栈顶引用压入操作数栈
         4: iload_1                     // 将局部变量int e压入操作数栈
         5: invokespecial #3            // Method copy/SimpleQueue$Node."<init>":(I)V SimpleQueue.Node构造方法
         8: astore_2                    // 将引用node从操作数栈顶出栈,存到局部变量表,对应Node node = new Node(e)的引用赋值
         9: aload_0                     // 将引用类型this读入操作数栈
        10: getfield      #5            // Field tail:Lcopy/SimpleQueue$Node; 获取对象成员引用实例this.tail入操作数栈
        13: aload_2                     // 将引用类型局部变量node读入操作数栈
        14: putfield      #6            // Field copy/SimpleQueue$Node.next:Lcopy/SimpleQueue$Node;设置对象成员引用类型tail.next为操作数栈顶数据node
        17: aload_0                     // 将引用类型this读入操作数栈
        18: aload_0                     // 将引用类型this读入操作数栈
        19: getfield      #5            // Field tail:Lcopy/SimpleQueue$Node; 读取this.tail入操作数栈
        22: getfield      #6            // Field copy/SimpleQueue$Node.next:Lcopy/SimpleQueue$Node; 读取this.tail.next入操作数栈
        25: putfield      #5            // Field tail:Lcopy/SimpleQueue$Node; 设置this.tail为操作数栈顶数据this.tail.next
        28: return
      LineNumberTable:
        line 27: 0
        line 28: 9
        line 29: 17
        line 30: 28
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0      29     0  this   Lcopy/SimpleQueue;
            0      29     1     e   I
            9      20     2  node   Lcopy/SimpleQueue$Node;

观察上述字节码,对共享变量tail的 读-改-写 不是原子的,多个线程添加的过程中,每个线程执行到的字节码行号是不同的,这就可能导致丢失修改,进而导致空指针异常,如下图分析(图中堆内存是共享内存,线程私有内存区域是单核多级高速缓存,线程私有的局部变量表存储的是堆内存对象的引用地址、基本类型数值。此处无需考虑可见性问题:getfield指令读取堆内存到私有内存,putfield指令将私有内存直接写入共享内存,无缓存不一致现象)
lose update

并发安全的FIFO为什么还要CAS

我们的真实目的是实现并发控制,即 获得锁的线程进入临界区代码,竞争锁失败的线程要阻塞禁止进入,所以需要线程CAS获取锁来决定是否放行

而我们只是想用FIFO队列来控制线程使用临界区代码的顺序,并发安全是针对队列入队出队操作的并发安全而言

如果不加CAS限制,每个队首线程都将执行完lock()代码进入临界区,失去了锁的作用

为什么要park()

控制线程暂停,限制同时自旋的线程个数:如果非队首线程不park()阻塞,所有非队首节点就会一直自旋获取锁造成CPU空转,造成不必要的性能开销

参考文献

LockSupport – Java™ Platform Standard Ed. 8
翟陆续, 薛宾田. Java并发编程之美[M]. 1. 电子工业出版社, 2018.
方腾飞, 魏鹏, 程晓明. Java并发编程的艺术[M]. 1. 机械工业出版社, 2015.

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

©2018-2024 Howell版权所有 备案号:冀ICP备19000576号