[Java源码][并发J.U.C]—并发工具类CyclicBarrier

作者 : 开心源码 本文共7049个字,预计阅读时间需要18分钟 发布时间: 2022-05-12 共189人阅读

前言

CyclicBarrier要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障阻拦的线程才会继续运行. 简单地说就是人到齐了后才可以让每个人继续去做自己的事情.

CycliBarrier是通过ReentrantLockCondition实现的一个数据结构.

本文代码: 代码下载

例子1

先通过一个简单的例子理解一下CyclicBarrier.

package com.sourcecode.concurrencytools_CyclicBarrier;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierTest4 {    static CyclicBarrier c = new CyclicBarrier(5);    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {        for (int i = 0; i < 5; i++) {            Thread thread = new MyThread();            thread.start();        }    }    static class MyThread extends Thread {        @Override        public void run() {            try {                System.out.println(Thread.currentThread().getName() + " tries to wait!");                c.await();            } catch (Exception e) {                System.out.println(e);                //System.out.println(Thread.currentThread().getName() + "------>" + c.isBroken() + ", interrupted status:" + Thread.currentThread().isInterrupted());            } finally {                System.out.println(Thread.currentThread().getName() + " finishes!");            }        }    }}

运行结果如下: 初始化CyclicBarrier的时候参数是5,表示需要等待5个线程达到后才可以打开屏障,正如结果所示,thread-0thread-3在等待,到最后一个线程thread-4到达屏障时,此时屏障打开,每个线程执行各自接下来的板块.

假如初始化参数大于5,比方6,此程序将一直阻塞,由于没有第6个线程到达该屏障.

Thread-0 tries to wait!Thread-1 tries to wait!Thread-2 tries to wait!Thread-3 tries to wait!Thread-4 tries to wait!Thread-4 finishes!Thread-1 finishes!Thread-0 finishes!Thread-2 finishes!Thread-3 finishes!

实现思路分析

cyclicbarrier.png

    private static class Generation {      boolean broken = false;    }    /** 重入锁 */    private final ReentrantLock lock = new ReentrantLock();    /** 一个lock对象的Condition实例 */    private final Condition trip = lock.newCondition();    /** 阻拦线程的总个数 */    private final int parties;    /** The command to run when tripped */    private final Runnable barrierCommand;    /** The current generation */    private Generation generation = new Generation();    /** 阻拦线程的剩余需要数量 */    private int count;

从该图可以看出CyclicBarrier有一个重入锁的变量lock并且持有一个该锁的Condition实例trip,即可以大概知道该CyclicBarrier会让线程尝试获取锁并且在拿到锁后将屏障个数减减操作,而后根据count的数量来决定能否调使用trip.await()操作,比方count==0表示最后一个到达屏障的线程,那么就不需要调使用trip的方法了.

构造方法

    public CyclicBarrier(int parties) {      this(parties, null);    }    public CyclicBarrier(int parties, Runnable barrierAction) {        if (parties <= 0) throw new IllegalArgumentException();        this.parties = parties;        this.count = parties;        this.barrierCommand = barrierAction;    }

第二个参数Runnable barrierAction表示的是当最后一个到达屏障的线程先执行完该barrierActionrun方法后再执行唤醒其余线程的操作.简单地说当到达屏障时,先执行barrierAction的业务再执行其余线程的业务.

await方法

await方法有两个,分别为await()await(long timeout, TimeUnit unit)方法,一个没有超时返回,另外一个有超时返回,但是两者都是调使用dowait(boolean timed, long nanos),该方法是整个CyclicBarrier的核心实现.

public int await() throws InterruptedException, BrokenBarrierException {        try {            return dowait(false, 0L);        } catch (TimeoutException toe) {            throw new Error(toe); // cannot happen        }    }public int await(long timeout, TimeUnit unit)            throws InterruptedException,            BrokenBarrierException,            TimeoutException {        return dowait(true, unit.toNanos(timeout));    }

所以接下来的看看该方法dowait是如何实现的.

/**     * @param timed 能否需要超时     * @param nanos 时长     * @return 返回还需要等待多少个线程才可以到达屏障     * @throws InterruptedException 当前线程中断     * @throws BrokenBarrierException 有其余线程中断或者者其余线程超时     * @throws TimeoutException 当前线程等待超时     */    private int dowait(boolean timed, long nanos)            throws InterruptedException, BrokenBarrierException,            TimeoutException {        // 获取重入锁        final ReentrantLock lock = this.lock;        // 尝试获取锁        lock.lock();        try {            //System.out.println(Thread.currentThread().getName() + " get locks.");            // 取得当前代            final Generation g = generation;            // 假如有线程中断或者者超时            if (g.broken)                throw new BrokenBarrierException();            // 假如当前线程被中断            if (Thread.interrupted()) {                breakBarrier();                throw new InterruptedException();            }            int index = --count;            //System.out.format("index=%d\n", index);            if (index == 0) {  // 最后一个到达屏障的线程                boolean ranAction = false;                try {                    final Runnable command = barrierCommand;                    if (command != null)                        command.run();                    ranAction = true;                    nextGeneration(); //升级下一代                    return 0;                } finally {                    // 假如执行command.run发生异常,则breakBarrier                    if (!ranAction)                        breakBarrier();                }            }            // loop until tripped, broken, interrupted, or timed out            for (;;) {                try {                    if (!timed)                        trip.await();                    else if (nanos > 0L)                        nanos = trip.awaitNanos(nanos);                } catch (InterruptedException ie) {                    // 假如等待过程中有被线程中断                    if (g == generation && ! g.broken) {                        breakBarrier();                        throw ie;                    } else {                        // We're about to finish waiting even if we had not                        // been interrupted, so this interrupt is deemed to                        // "belong" to subsequent execution.                        Thread.currentThread().interrupt();                    }                }                // 假如当代的broken为true,表明有线程被中断                if (g.broken)                    throw new BrokenBarrierException();                // 假如换代了 表示可以返回了                if (g != generation)                    return index;                // 假如超时则先break the current generation                // 再抛出超时异常                if (timed && nanos <= 0L) {                    breakBarrier();                    throw new TimeoutException();                }            }        } finally {            // 释放锁            //System.out.println(Thread.currentThread().getName() + " release locks.");            lock.unlock();        }    }/**     *  break the current generation     *  1. broken设置为true     *  2. count 重新设置为parties     *  3. 唤醒所有线程     */    private void breakBarrier() {        generation.broken = true;        count = parties;        trip.signalAll();    }    /**     *  start a new generation     *  1. 唤醒所有等待中的线程     *  2. count 重新设置为parties     *  3. generation 设置成一个新的Generation对象     */    private void nextGeneration() {        // signal completion of last generation        trip.signalAll();        // set up next generation        count = parties;        generation = new Generation();    }

该方法的流程大概如下:
1. 尝试获取锁
2. 假如不是最后一个到达屏障的线程,则进入for循环中一直等待(此时该线程会释放锁)直到被最后一个线程唤醒或者者被某个线程中断后调使用breakBarrier方法唤醒. 唤醒后需要竞争再次取得锁后才可以继续执行.
3. 假如是最后一个到达屏障的线程,假如barrierCommand不为空,则需要先执行barrierCommand.run()方法,而后通过nextGeneration唤醒等待的线程.
4. 在所有异常退出或者者正常退出都需要释放锁.
流程图如下

dowait.png

例子2

设置线程屏障为3,启动两个线程2秒超时等待,让最后一个线程3秒后才到达屏障.

package com.sourcecode.concurrencytools_CyclicBarrier;import java.util.concurrent.TimeUnit;public class CyclicBarrierTest5 {    static CyclicBarrier c = new CyclicBarrier(3);    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {        for (int i = 0; i < 2; i++) {            Thread thread = new MyThread();            thread.start();        }        TimeUnit.SECONDS.sleep(3);        System.out.println(Thread.currentThread().getName() + "------>" + "tries to wait!");        c.await();        System.out.println(Thread.currentThread().getName() + "------>" + "finishes!");    }    static class MyThread extends Thread {        @Override        public void run() {            try {                System.out.println(Thread.currentThread().getName() + " tries to wait!");                c.await(1, TimeUnit.SECONDS);                //c.await();            } catch (Exception e) {                System.out.println(Thread.currentThread().getName() + "---->" + e);                //System.out.println(Thread.currentThread().getName() + "------>" + c.isBroken() + ", interrupted status:" + Thread.currentThread().isInterrupted());            } finally {                System.out.println(Thread.currentThread().getName() + " finishes!");            }        }    }}

结果如下: 可以看到第一个线程出现超时异常后,表示该线程已经调使用了breakBarrier方法,所以可以看到后续的两个线程都是抛出BrokenBarrierException异常.

Thread-0 tries to wait!Thread-1 tries to wait!Thread-1---->java.util.concurrent.TimeoutExceptionThread-0---->com.sourcecode.concurrencytools_CyclicBarrier.BrokenBarrierExceptionThread-0 finishes!Thread-1 finishes!main------>tries to wait!Exception in thread "main" com.sourcecode.concurrencytools_CyclicBarrier.BrokenBarrierException    at com.sourcecode.concurrencytools_CyclicBarrier.CyclicBarrier.dowait(CyclicBarrier.java:69)    at com.sourcecode.concurrencytools_CyclicBarrier.CyclicBarrier.await(CyclicBarrier.java:39)    at com.sourcecode.concurrencytools_CyclicBarrier.CyclicBarrierTest5.main(CyclicBarrierTest5.java:14)

isBroken方法和reset方法

/**     * @return 当前代能否被破坏, 被破坏的两种情况, 某个线程中断或者者等待超时     */    public boolean isBroken() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            return generation.broken;        } finally {            lock.unlock();        }    }public void reset() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            breakBarrier();   // break the current generation            nextGeneration(); // start a new generation        } finally {            lock.unlock();        }    }

reset留作遇到好的例子后再分析

参考

1. Java并发编程的艺术

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » [Java源码][并发J.U.C]—并发工具类CyclicBarrier

发表回复