Semaphore
- 应用
- 限制访问共享资源的线程数目
- 原理说明
- 共享资源池有permits个门票(假定permits只允许逐个申请逐个归还),每申请门票时permits–,permits<0无盈余准入资格,此时申请线程进入permits的阻塞队列,|permits|即阻塞队列中线程的数量。每归还门票时permits++,permits≤0有线程正在等待,此时唤醒阻塞队列中的等待线程获取此次归还的准入资格
- 构造函数
Semaphore(int permits) 指定准入资格数量,默认非公平锁 Semaphore(int permits, boolean fair) 指定准入资格数量,可指定公平锁
- 常用API
void acquire() 申请准入资格,无可用则阻塞,直到有可用资源或线程被打断被唤醒 void release() 归还准入资格,若有则唤醒阻塞队列中的等待线程
- 使用方法
private static final int NUM = 100;
private static final Semaphore sem = new Semaphore(NUM);
try{
sem.acquire();
//资源池线程安全的原子取操作
}catch(InterruptedExceptione){
e.printStackTrace();
}finally{
//资源池线程安全的原子归还操作
sem.release();
}
- 注意事项
- 使用信号量时允许多个线程访问共享资源,很可能导致并发问题,所以应确保对共享资源线程安全的原子操作
CountDownLatch
- 应用
- 控制一组线程同步进度,等待所有线程都执行完毕才继续向下执行,常用于并行计算的划分和整合
- 控制某个线程操作N次后继续向下执行
- 原理说明
- 初始需要同步进度的线程数count。每个线程执行完自己的逻辑后count–,先执行完的线程count≠0,说明有其他线程未执行完毕,原地阻塞等待。直到所有线程执行完毕count=0,此时唤醒所有其他线程继续向下执行
- 构造函数
CountDownLatch(int count)
- 常用API
void countdown() 执行自减,到0后唤醒所有等待线程 void await() 当前线程等待进度到0
- 使用方法
- 官方案例使用两个CDL来相互同步等待:所有worker需要等待driver执行完doPrepare()后才可执行,driver需要等待所有worker执行完毕才可继续向下执行doCollect()
classDriver{
void main() throws InterruptedException{
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for(int i=0; i<N; ++i)
new Thread(new Worker(startSignal, doneSignal)).start();
doPrepare();
startSignal.countDown();
doSomethingElse();
doneSignal.await();
doCollect();
}
}
class Worker implements Runnable{
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal){
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run(){
try{
startSignal.await();
doWork();
doneSignal.countDown();
}catch(InterruptedException ex){...}
}
void doWork(){...}
}
- 注意事项
- 每次使用CDL需要重置初始值
- 任一线程在await()过程中设置interrupt标志或被其他线程打断抛出InterruptedException,此时必须终止CountDownLatch过程,否则其他线程将一直等待下去
- countdown()调用happen-before于await()调用
CyclicBarrier
- 应用和原理
- 同CountDownLatch
- 构造函数
CyclicBarrier(int parties) parties个线程等待实例时放行各线程继续向下执行,并开启新一轮计数等待 CyclicBarrier(int parties, Runnable barrierAction) parties个线程等待实例时,由最后一个进入barrier实例的线程同步执行barrierAction函数,函数执行完毕放行各线程继续向下执行,并开启新一轮计数等待
- 常用API
int await() 当前线程处于等待状态,直到所有线程都处于等待状态放行。倒序返回索引显示当前进度,可指定执行到某进度的线程打印信息或执行不依赖各线程的barrierAction函数
- 使用方法
- 官方案例展现了一个可划分的并行任务:N个线程并行,各自处理各自的矩阵行。当且仅当所有线程执行完毕时,由最后一个进入barrier的线程执行mergeRows()合并N个线程的计算结果,合并完毕后改变done()返回值,并放行各线程继续向下执行。各线程继续循环时done()退出循环,等到所有线程执行完毕后终止
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
Runnable barrierAction = () -> mergeRows(...);
barrier = new CyclicBarrier(N, barrierAction);
List<Thread> threads = new ArrayList<>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
for (Thread thread : threads)
thread.join();
}
class Worker implements Runnable {
int myRow;
Worker(int row) {
myRow = row;
}
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
}
- 注意事项
- CyclicBarrier每次放行后自动重置计数为初始值parties
- 如果一个线程在await()时发生中断InterruptedException,故障BrokenBarrierException,超时TimeoutException,其他所有await()线程都将触发相应异常
- 各线程逻辑happen-before于await()调用
CountDownLatch和CyclicBarrier比较
- 相同点
- 计数原理相同,都可用于线程之间的等待以达到步调一致
- 不同点
- CDL放行后计数不重置,CB放行后计数自动重置
- 使用场景,CDL更多用于一个线程等多个线程,CB多用于线程之间相互等待
- 回调函数,CB可以在步调一致后执行一个回调函数,CDL无
- await返回值,CDL无进度返回值,CB提供了进度返回值
- 异常处理机制,CDL仅在线程await()时触发InterruptedException,且中断若不处理其他线程将永远等下去;CB只要有一个线程发生中断、超时、故障将抛出对应的异常并结束等待过程
参考资料
CountDownLatch(JavaSE11&JDK11)
CyclicBarrier(JavaSE11&JDK11)
Semaphore(JavaSE11&JDK11)