并发工具类 – 线程同步和控制

Semaphore

  • 应用
    • 限制访问共享资源的线程数目
  • 原理说明
    • 共享资源池有permits个门票(假定permits只允许逐个申请逐个归还),每申请门票时permits–,permits<0无盈余准入资格,此时申请线程进入permits的阻塞队列,|permits|即阻塞队列中线程的数量。每归还门票时permits++,permits≤0有线程正在等待,此时唤醒阻塞队列中的等待线程获取此次归还的准入资格
  • 构造函数
Semaphore(int permits) 指定准入资格数量,默认非公平锁
Semaphore(int permits, boolean fair) 指定准入资格数量,可指定公平锁
  • 常用API
void acquire() 申请准入资格,无可用则阻塞,直到有可用资源或线程被打断被唤醒
void release() 归还准入资格,若有则唤醒阻塞队列中的等待线程
  • 使用方法
private static final int NUM = 100;
private static final Semaphore sem = new Semaphore(NUM);
try{
    sem.acquire();
    //资源池线程安全的原子取操作
}catch(InterruptedExceptione){
    e.printStackTrace();
}finally{
    //资源池线程安全的原子归还操作
    sem.release();
}
  • 注意事项
    • 使用信号量时允许多个线程访问共享资源,很可能导致并发问题,所以应确保对共享资源线程安全的原子操作

CountDownLatch

  • 应用
    • 控制一组线程同步进度,等待所有线程都执行完毕才继续向下执行,常用于并行计算的划分和整合
    • 控制某个线程操作N次后继续向下执行
  • 原理说明
    • 初始需要同步进度的线程数count。每个线程执行完自己的逻辑后count–,先执行完的线程count≠0,说明有其他线程未执行完毕,原地阻塞等待。直到所有线程执行完毕count=0,此时唤醒所有其他线程继续向下执行
  • 构造函数
CountDownLatch(int count)
  • 常用API
void countdown() 执行自减,到0后唤醒所有等待线程
void await() 当前线程等待进度到0
  • 使用方法
    • 官方案例使用两个CDL来相互同步等待:所有worker需要等待driver执行完doPrepare()后才可执行,driver需要等待所有worker执行完毕才可继续向下执行doCollect()
classDriver{
    void main() throws InterruptedException{
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);
        for(int i=0; i<N; ++i)
            new Thread(new Worker(startSignal, doneSignal)).start();
        doPrepare();
        startSignal.countDown();
        doSomethingElse();
        doneSignal.await();
        doCollect();
    }
}
class Worker implements Runnable{
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    Worker(CountDownLatch startSignal, CountDownLatch doneSignal){
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }
    public void run(){
        try{
        startSignal.await();
        doWork();
        doneSignal.countDown();
        }catch(InterruptedException ex){...}
    }
    void doWork(){...}
}
  • 注意事项
    • 每次使用CDL需要重置初始值
    • 任一线程在await()过程中设置interrupt标志或被其他线程打断抛出InterruptedException,此时必须终止CountDownLatch过程,否则其他线程将一直等待下去
    • countdown()调用happen-before于await()调用

CyclicBarrier

  • 应用和原理
    • 同CountDownLatch
  • 构造函数
CyclicBarrier(int parties) parties个线程等待实例时放行各线程继续向下执行,并开启新一轮计数等待
CyclicBarrier(int parties, Runnable barrierAction) parties个线程等待实例时,由最后一个进入barrier实例的线程同步执行barrierAction函数,函数执行完毕放行各线程继续向下执行,并开启新一轮计数等待
  • 常用API
int await() 当前线程处于等待状态,直到所有线程都处于等待状态放行。倒序返回索引显示当前进度,可指定执行到某进度的线程打印信息或执行不依赖各线程的barrierAction函数
  • 使用方法
    • 官方案例展现了一个可划分的并行任务:N个线程并行,各自处理各自的矩阵行。当且仅当所有线程执行完毕时,由最后一个进入barrier的线程执行mergeRows()合并N个线程的计算结果,合并完毕后改变done()返回值,并放行各线程继续向下执行。各线程继续循环时done()退出循环,等到所有线程执行完毕后终止
class Solver {
    final int N;
    final float[][] data;
    final CyclicBarrier barrier;
    public Solver(float[][] matrix) {
        data = matrix;
        N = matrix.length;
        Runnable barrierAction = () -> mergeRows(...);
        barrier = new CyclicBarrier(N, barrierAction);
        List<Thread> threads = new ArrayList<>(N);
        for (int i = 0; i < N; i++) {
            Thread thread = new Thread(new Worker(i));
            threads.add(thread);
            thread.start();
        }
        for (Thread thread : threads)
            thread.join();
    }

    class Worker implements Runnable {
        int myRow;
        Worker(int row) {
            myRow = row;
        }
        public void run() {
            while (!done()) {
                processRow(myRow);
                try {
                    barrier.await();
                } catch (InterruptedException ex) {
                    return;
                } catch (BrokenBarrierException ex) {
                    return;
                }
            }
        }
    }
}
  • 注意事项
    • CyclicBarrier每次放行后自动重置计数为初始值parties
    • 如果一个线程在await()时发生中断InterruptedException,故障BrokenBarrierException,超时TimeoutException,其他所有await()线程都将触发相应异常
    • 各线程逻辑happen-before于await()调用

CountDownLatch和CyclicBarrier比较

  • 相同点
    • 计数原理相同,都可用于线程之间的等待以达到步调一致
  • 不同点
    • CDL放行后计数不重置,CB放行后计数自动重置
    • 使用场景,CDL更多用于一个线程等多个线程,CB多用于线程之间相互等待
    • 回调函数,CB可以在步调一致后执行一个回调函数,CDL无
    • await返回值,CDL无进度返回值,CB提供了进度返回值
    • 异常处理机制,CDL仅在线程await()时触发InterruptedException,且中断若不处理其他线程将永远等下去;CB只要有一个线程发生中断、超时、故障将抛出对应的异常并结束等待过程
参考资料

CountDownLatch(JavaSE11&JDK11)
CyclicBarrier(JavaSE11&JDK11)
Semaphore(JavaSE11&JDK11)

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

©2018-2024 Howell版权所有 备案号:冀ICP备19000576号