LockSupport接口提供了线程阻塞和唤醒方法,其中唤醒机制在官方文档中提到一些tricks,本文对其加以实例讨论验证。另对官方案例深入分析,做出一些思考讨论
阻塞指定线程 park
public static void park(Object blocker) public static void parkNanos(Object blocker, long nanos) public static void parkUntil(Object blocker, long deadline)
park()
将当前线程阻塞在park()代码行,三种情况会被唤醒继续执行:
1)其他线程调用unpark(currentThread)唤醒当前线程
2)当前线程被其他线程 打断,但是仅唤醒不抛InterruptedException。可在park()方法后运行Thread.interrupted()检测唤醒是否由于中断引起
3)没有任何理由自己唤醒(虚假唤醒)。为避免虚假唤醒,应将park()方法置于循环中
park(Object blocker)
将正在执行blocker对象代码的当前线程阻塞在park()代码行,被唤醒的三种情况同park(),线程dump时会显示阻塞对象blocker
parkNanos(long nanos)
将当前线程阻塞在parkNanos代码行处,若线程阻塞时长在(0,nanos)区间未被上述三种情况唤醒,则在阻塞时长达到nanos时自唤醒,避免永久等待
parkUntil(long deadline)
将当前线程阻塞在parkUntil代码行处,若线程在绝对时间deadline前未被上述三种情况唤醒,则在绝对时间deadline后自唤醒,避免永久等待
唤醒指定线程 unpark
public static void unpark(Thread thread)
唤醒阻塞在park()代码行的线程继续向下执行,有如下几点细节需注意
每个unpark()唤醒一个线程的park()
一般park()在先,unpark()在后,成对使用
public static void main(String[] args) {
final Thread t1 = new Thread(() -> {
long start, end;
System.out.println("time before park : " + (start = System.currentTimeMillis()));
LockSupport.park();
System.out.println("time after park : " + (end = System.currentTimeMillis()));
System.out.println("t1 is unpark by t2, time for waiting : " + (end - start));
});
t1.start();
new Thread(() -> {
try {
Thread.sleep(5000);
LockSupport.unpark(t1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
time before park : 1664120513742 time after park : 1664120518748 t1 is unpark by t2, time for waiting : 5006
多余的unpark()使活跃线程的下次park()失效
此处在线程启动之后、park()之前执行的unpark(),使得本次park()失效
public static void main(String[] args) {
System.out.println(Thread.currentThread().getName() + System.currentTimeMillis());
Thread t = new Thread(() -> {
long start, end;
System.out.println(Thread.currentThread().getName() + (start = System.currentTimeMillis()));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
LockSupport.park();
System.out.println(Thread.currentThread().getName() + (end = System.currentTimeMillis()));
System.out.println("this park is unabled by last unpark, margin millis is " + (end - start));
});
t.start();
LockSupport.unpark(t);
System.out.println(Thread.currentThread().getName() + System.currentTimeMillis());
}
main1664115567502 Thread-01664115567532 main1664115567532 Thread-01664115572542 this park is unabled by last unpark, margin millis is 5010
park之前的多次unpark()等价于一次unpark()
如下例中park()之前的三次unpark()仅可成功消解一次park()
public static void main(String[] args) {
Thread t = new Thread(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
LockSupport.park();
System.out.println("unpark success 1");
LockSupport.park();
System.out.println("unpark success 2");
LockSupport.park();
System.out.println("unpark success 3");
});
t.start();
LockSupport.unpark(t);
LockSupport.unpark(t);
LockSupport.unpark(t);
}
unpark success 1
线程启动之前的unpark()不对线程park()生效
public static void main(String[] args) {
Thread t = new Thread(() -> {
LockSupport.park();
System.out.println("unpark before thread start works");
});
LockSupport.unpark(t);
t.start();
}
线程被阻塞未输出任何结果
官方实例解析
FIFOMutex是公平锁:线程入队成为队列节点,只允许队头线程获取锁,因而保证了按排队顺序获取锁
代码详细分析
本锁是AQS中ReetrantLock公平锁的简化版实现,原理图类似,可参考相关文章
class FIFOMutex {
// locked状态:false为锁空闲,true为锁占有
private final AtomicBoolean locked = new AtomicBoolean(false);
// 并发安全的队列
private final Queue<Thread> waiters = new ConcurrentLinkedQueue<Thread>();
public void lock() {
boolean wasInterrupted = false;
Thread current = Thread.currentThread();
// 当前线程进队列
waiters.add(current);
// 1. 当前线程不是队首,直接挂起
// 2. 当前队首线程由于上个队首线程占有锁,CAS竞争锁失败挂起
while (waiters.peek() != current ||
!locked.compareAndSet(false, true)) {
LockSupport.park(this);
// 线程恢复运行后判断是否是由于被打断而唤醒
if (Thread.interrupted())
wasInterrupted = true;
}
// 当前线程是队首且CAS竞争锁成功出队列放行,执行完lock()方法后进入锁区
waiters.remove();
// 恢复线程打断处理
if (wasInterrupted)
current.interrupt();
}
public void unlock() {
// 执行此方法的线程必是自旋竞争锁成功的线程,其他线程均在lock阶段park阻塞
locked.set(false);
// 唤醒队头线程
LockSupport.unpark(waiters.peek());
}
}
为什么要使用并发安全的队列
手写个简单队列,以入队操作为例说明并发不安全的后果
public class SimpleQueue {
Node head = new Node(-1);
Node tail = head;
static class Node {
Node next;
int name;
public Node(int name) {
this.name = name;
this.next = null;
}
}
public void add(int e) {
Node node = new Node(e);
tail.next = node;
tail = tail.next;
}
public void print() {
Node cur = head;
while (cur != null) {
System.out.println(cur.name + " ");
cur = cur.next;
}
}
public static void main(String[] args) throws InterruptedException, IOException {
SimpleQueue q = new SimpleQueue();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 100000; i += 2) {
q.add(i);
}
});
Thread t2 = new Thread(() -> {
for (int i = 1; i < 100000; i += 2) {
q.add(i);
}
});
t1.start(); t2.start();
// main线程等待t1、t2执行完毕后再执行输出
t2.join(); t1.join();
q.print();
}
}
javap -c(反编译class) -l(行号和本地变量表) -v(附加信息) 得到上述代码的编译字节码,过滤出add()相关内容
Constant pool: #2 = Class #74 // copy/SimpleQueue$Node #3 = Methodref #2.#75 // copy/SimpleQueue$Node."<init>":(I)V #5 = Fieldref #16.#77 // copy/SimpleQueue.tail:Lcopy/SimpleQueue$Node; #6 = Fieldref #2.#78 // copy/SimpleQueue$Node.next:Lcopy/SimpleQueue$Node; public void add(int); descriptor: (I)V flags: ACC_PUBLIC Code: stack=3, locals=3, args_size=2 0: new #2 // class copy/SimpleQueue$Node 创建SimpleQueue.Node实例 3: dup // 复制栈顶引用压入操作数栈 4: iload_1 // 将局部变量int e压入操作数栈 5: invokespecial #3 // Method copy/SimpleQueue$Node."<init>":(I)V SimpleQueue.Node构造方法 8: astore_2 // 将引用node从操作数栈顶出栈,存到局部变量表,对应Node node = new Node(e)的引用赋值 9: aload_0 // 将引用类型this读入操作数栈 10: getfield #5 // Field tail:Lcopy/SimpleQueue$Node; 获取对象成员引用实例this.tail入操作数栈 13: aload_2 // 将引用类型局部变量node读入操作数栈 14: putfield #6 // Field copy/SimpleQueue$Node.next:Lcopy/SimpleQueue$Node;设置对象成员引用类型tail.next为操作数栈顶数据node 17: aload_0 // 将引用类型this读入操作数栈 18: aload_0 // 将引用类型this读入操作数栈 19: getfield #5 // Field tail:Lcopy/SimpleQueue$Node; 读取this.tail入操作数栈 22: getfield #6 // Field copy/SimpleQueue$Node.next:Lcopy/SimpleQueue$Node; 读取this.tail.next入操作数栈 25: putfield #5 // Field tail:Lcopy/SimpleQueue$Node; 设置this.tail为操作数栈顶数据this.tail.next 28: return LineNumberTable: line 27: 0 line 28: 9 line 29: 17 line 30: 28 LocalVariableTable: Start Length Slot Name Signature 0 29 0 this Lcopy/SimpleQueue; 0 29 1 e I 9 20 2 node Lcopy/SimpleQueue$Node;
观察上述字节码,对共享变量tail的 读-改-写 不是原子的,多个线程添加的过程中,每个线程执行到的字节码行号是不同的,这就可能导致丢失修改,进而导致空指针异常,如下图分析(图中堆内存是共享内存,线程私有内存区域是单核多级高速缓存,线程私有的局部变量表存储的是堆内存对象的引用地址、基本类型数值。此处无需考虑可见性问题:getfield指令读取堆内存到私有内存,putfield指令将私有内存直接写入共享内存,无缓存不一致现象)
并发安全的FIFO为什么还要CAS
我们的真实目的是实现并发控制,即 获得锁的线程进入临界区代码,竞争锁失败的线程要阻塞禁止进入,所以需要线程CAS获取锁来决定是否放行
而我们只是想用FIFO队列来控制线程使用临界区代码的顺序,并发安全是针对队列入队出队操作的并发安全而言
如果不加CAS限制,每个队首线程都将执行完lock()代码进入临界区,失去了锁的作用
为什么要park()
控制线程暂停,限制同时自旋的线程个数:如果非队首线程不park()阻塞,所有非队首节点就会一直自旋获取锁造成CPU空转,造成不必要的性能开销
参考文献
LockSupport – Java™ Platform Standard Ed. 8
翟陆续, 薛宾田. Java并发编程之美[M]. 1. 电子工业出版社, 2018.
方腾飞, 魏鹏, 程晓明. Java并发编程的艺术[M]. 1. 机械工业出版社, 2015.