JUC提供了一些控制并发流程的工具类,作用就是帮助我们程序员更容易的让线程之间进行合作,让线程之间相互配合,来满足业务需求。
概览
控制并发流程的工具类如下:

CountDownLatch
CountDownLatch使用一个计数器,计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成一些任务,然后在CountDownLatch上等待的线程就可以恢复执行接下来的任务。需要注意的CountDownLatch不能重置计数值,一个CountDownLatch使用完毕,如果还想进行倒数,就得重新new一个对象。
CountDownLatch主要方法如下:
- CountDownLatch(int count):仅有的一个构造函数,count就是需要传入的计数值。
- await():调用await()的线程将会被挂起,直到count为0时才继续执行。
- countDown():将count减1,直到为0时,等待的线程被唤醒。
CountDownLatch用法一:一个线程等待多个线程都执行完毕,再继续执行自己的工作。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class CountDownLatchDemo1 { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(5); ExecutorService executorService = Executors.newFixedThreadPool(5); Runnable runnable = () -> { try { Thread.sleep(new Random().nextInt(1000)); System.out.println(Thread.currentThread().getName() + "已完成"); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } }; for (int i = 0; i < 5; i++) { executorService.execute(runnable); } latch.await(); System.out.println("所有线程已完成"); executorService.shutdown(); } }
|

CountDownLatch用法二:多个线程等待某一个线程的信号,同时开始执行。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class CountDownLatchDemo2 { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); ExecutorService executorService = Executors.newFixedThreadPool(5); Runnable runnable = () -> { try { System.out.println(Thread.currentThread().getName() + "准备开始执行"); latch.await(); System.out.println(Thread.currentThread().getName() + "开始执行"); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } }; for (int i = 0; i < 5; i++) { executorService.execute(runnable); } Thread.sleep(1000); System.out.println("所有线程可以开始执行"); latch.countDown(); executorService.shutdown(); } }
|

Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。假如这里有N个资源,那就对应于N个许可证,同一时刻也只能有N个线程访问。一个线程获取许可证就调用acquire方法,用完了释放资源就调用release方法。如图:

Semaphore的主要方法如下:
- acquire():从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
- acquire(int permits):获取permits个信号量,否则等待。
- release():释放一个许可,将其返回给信号量。
- release(int permits):释放permits个获取的信号量。
- tryAcquire():尝试获取一个信号量,获取成功返回true。
- tryAcquire(long timeout, TimeUnit unit)():尝试获取一个信号量,直到出了指定的等待时间,获取成功返回true。
Semaphore的基本用法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
private static Semaphore semaphore = new Semaphore(3, true);
public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(50); Runnable runnable = () -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "获取到了许可证"); } catch (InterruptedException e) { e.printStackTrace(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(Thread.currentThread().getName() + "释放了许可证"); semaphore.release(); } }; for (int i = 0; i < 50; i++) { executorService.execute(runnable); } executorService.shutdown(); } }
|

Condition接口
Synchronized加锁状态时,是使用wait/notify/notifyAll进行线程间的通信。那么在使用ReentrantLock加锁时,也可以通过Condition进行线程间的通信,Condition中await()、signal()、signalAll()分别对应Object中的wait()、notify()、notifyAll()方法,但是Condition更加灵活。
Condition常用的方法如下:
- await():使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalAll()方法时,线程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。
- await(long time, TimeUnit unit):当前线程等待,直到线程被唤醒或者中断,或者等待时间耗尽。
- signal():唤醒一个等待的线程。
- signalAll():唤醒所有等待的线程。
Condition基本用法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
public class ConditionDemo1 {
private static ReentrantLock lock = new ReentrantLock(); private static Condition condition = lock.newCondition();
public static void main(String[] args) throws InterruptedException { Thread thread1 = new Thread(() -> { lock.lock(); try { System.out.println(Thread.currentThread().getName() + "进入等待状态"); condition.await(); System.out.println(Thread.currentThread().getName() + "被唤醒了"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }); Thread thread2 = new Thread(() -> { lock.lock(); try { System.out.println(Thread.currentThread().getName() + "唤醒其他线程"); condition.signal(); } finally { lock.unlock(); } }); thread1.start(); thread2.start(); thread1.join(); thread2.join(); } }
|

也可以使用Condition实现生产者消费者模式,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| import java.util.ArrayDeque; import java.util.Deque; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
public class ConditionDemo2 {
private static ReentrantLock lock = new ReentrantLock(); private static Condition consumer = lock.newCondition(); private static Condition producer = lock.newCondition(); private static int size = 10; private static Deque<Integer> queue = new ArrayDeque<>(size);
public static void main(String[] args) { Thread consumer = new Consumer(); Thread producer = new Producer(); consumer.start(); producer.start(); }
static class Consumer extends Thread {
@Override public void run() { put(); }
private void put() { while (true) { lock.lock(); try { while (queue.size() == 0) { System.out.println("队列空,等待生产数据"); consumer.await(); } queue.poll(); System.out.println("消费者消费了一个数据,还剩" + queue.size() + "个"); Thread.sleep(100); producer.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } }
static class Producer extends Thread {
@Override public void run() { take(); }
private void take() { while (true) { lock.lock(); try { while (queue.size() == size) { System.out.println("队列满了,等待消费数据"); producer.await(); } queue.offer(1); System.out.println("生产者生产了一个数据,还剩" + queue.size() + "个"); Thread.sleep(100); consumer.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } }
|

CyclicBarrier
CyclicBarrier循环栅栏和CountDownLatch很类似,都能阻塞一组线程。CyclicBarrier要等固定数量的线程都到达了栅栏位置才能继续执行,而CountDownLatch只需等待数字到0,也就是说,CountDownLatch作用于事件,CyclicBarrier作用与线程。并且CyclicBarrier可以被重用。
当有大量线程相互配合,分别计算不同任务,并且需要最后统一汇总的时候,我们就可以使用CyclicBarrier。CyclicBarrier可以构造一个集结点,当某一个线程执行完毕,它就会到集结点进行等待,直到所有的线程都到了集结点,那么该栅栏就会被撤销,所有线程再统一出发,继续执行剩下的任务。
CyclicBarrier示例代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> System.out.println("所有线程已经集合,统一出发"));
public static void main(String[] args) { Runnable runnable = () -> { System.out.println(Thread.currentThread().getName() + "前往集合地点"); try { Thread.sleep(new Random().nextInt(1000)); System.out.println(Thread.currentThread().getName() + "已经到了集合地点,等待其他线程"); barrier.await(); System.out.println(Thread.currentThread().getName() + "已经出发"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }; for (int i = 0; i < 5; i++) { new Thread(runnable).start(); } } }
|
