Java/J.U.C(java.util.concurrent)包的梳理

Java/J.U.C(java.util.concurrent)包的梳理

java.util.concurrent提供了并发编程的解决方案,主要包括两大块: - CAS:是java.util.concurrent.atomic包的基础 - AQS:是java.util.concurrent.locks包以及一些常用类(比如 Semophore、ReentrantLock)的基础

J.U.C 包的分类: - 线程执行器:executor - 锁:locks - 原子变量类:atomic - 并发工具类:tools - 并发集合:collections

并发工具类

  • CountDownLatch:让主线程等待一组事件发生后继续执行。主线程设置一个 CountDownLatch,然后主线程调用 CountDownLatch 对象的 await() 方法等待计数器,子线程调用 CountDownLatch 对象的 countDown() 方法将计数器减1。直到变为 0,主线程才继续执行
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
    CountDownLatchDemo demo = new CountDownLatchDemo();
    demo.go();
    }

    private void go() throws InterruptedException {
    //设置一个 CountDownLatch,计数器的值为 3
    CountDownLatch latch = new CountDownLatch(3);
    //依次创建 3 个线程,并启动
    new Thread(new Task(latch), "Thread1").start();
    Thread.sleep(1000);
    new Thread(new Task(latch), "Thread2").start();
    Thread.sleep(1000);
    new Thread(new Task(latch), "Thread3").start();
    latch.await();//等待子线程调用 countDown() 方法,直到计数器为0
    System.out.println("所有线程已到达,主线程开始执行:" + Thread.currentThread().getName());
    }

    class Task implements Runnable {
    private CountDownLatch latch;

    public Task(CountDownLatch latch) {
    this.latch = latch;
    }

    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName());
    //每调用一次 countDown(),CountDownLatch 中计数器的值就会减1
    latch.countDown();
    }
    }
    }
  • CyclicBarrier:阻塞自己当前线程,等待其他线程,所有线程必须同时到达后,当前线程才能继续执行。所有线程到达后,可以触发执行另一个预先设置的线程。和 CountDownLatch 的区别是:在计数器不为 0 时,CountDownLatch 不会阻塞子线程;而 CyclicBarrier 会阻塞所有线程,直到计数器为 0,所有线程才继续执行
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    public class CyclicBarrierDemo {
    public static void main(String[] args) throws InterruptedException {
    CyclicBarrierDemo demo = new CyclicBarrierDemo();
    demo.go();
    }

    private void go() throws InterruptedException {
    //设置一个 CyclicBarrier,计数器的值为 3
    CyclicBarrier barrier = new CyclicBarrier(3);
    //依次创建 3 个线程,并启动
    new Thread(new Task(barrier), "Thread1").start();
    Thread.sleep(1000);
    new Thread(new Task(barrier), "Thread2").start();
    Thread.sleep(1000);
    new Thread(new Task(barrier), "Thread3").start();
    }

    class Task implements Runnable {
    private CyclicBarrier barrier;

    public Task(CyclicBarrier barrier) {
    this.barrier = barrier;
    }

    @Override
    public void run() {
    System.out.println("线程: " + Thread.currentThread().getName() + "已经到达");

    try {
    //计数器减1,并阻塞,直到计数器为 0,才继续往下执行
    barrier.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (BrokenBarrierException e) {
    e.printStackTrace();
    }
    System.out.println("线程: " + Thread.currentThread().getName() + "开始处理");
    }
    }
    }
  • Semaphore:控制某个资源可同时被访问的线程数量。通过 acquire() 方法访问资源,如果信号量满了就阻塞,直到可访问。使用 release() 释放资源
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class SemaphoreDemo {
    public static void main(String[] args) {
    ExecutorService service = Executors.newCachedThreadPool();
    //设置信号量,最多只能 5 个线程同时访问
    Semaphore semaphore = new Semaphore(5);
    //模拟 20 个客户端同时访问
    for (int i = 0; i < 20; i++) {
    final int NUMBER = i;
    Runnable runnable = new Runnable() {
    @Override
    public void run() {
    try {
    semaphore.acquire();
    System.out.println("Accessing:" + NUMBER);
    Thread.sleep((long) (Math.random() * 1000));
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    semaphore.release();
    }
    }
    };
    service.execute(runnable);
    }
    //关闭线程池
    service.shutdown();
    }
    }
  • Exchanger:两个线程到达同步点后,相互交换数据
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public class ExchangerDemo {
    static Exchanger<String> exchanger = new Exchanger();

    public static void main(String[] args) {
    //代表男生和女生
    ExecutorService service = Executors.newFixedThreadPool(2);
    service.execute(() -> {
    //女生对男生说的话
    try {
    String girl = exchanger.exchange("我其实暗恋你很久了....");
    System.out.println("女生说:" + girl);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });

    service.execute(() -> {

    try {
    System.out.println("女生慢慢地从教室里走出来...");
    TimeUnit.SECONDS.sleep(2);
    //男生对女生说的话
    String boy = exchanger.exchange("我很喜欢你");
    System.out.println("男生说:" + boy);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    }
    }
    # 并发集合:collections
  • BlockingQueue:提供可阻塞的入队和出队操作。主要用于生产者和消费者模式,在多线程场景下生产者在队尾添加元素,而消费者线程则在队头消费元素,通过这种方式能够隔离任务的生产和消费。有 7 个实现类
    • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。先进先出,初始化容量后就不可改变
    • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列或者无界阻塞队列。先进先出,如果初始化时不指定大小,则默认大小为 Integer.MAX_VALUE,可以认为是无界的。
    • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
    • DelayQueue:一个支持延迟获取元素的无界阻塞队列
    • SynchronousQueue:仅允许容纳一个元素。当一个线程添加元素后会被阻塞,直到这个元素被另一个线程消费掉
    • LinkedTransferQueue:一个由链表组成的无界阻塞队列。相当于 SynchronousQueue + LinkedBlockingQueue。性能比 LinkedBlockingQueue 高,比 SynchronousQueue 能存储更多的元素
    • LinkedBlockingDQueue:一个由链表组成的双向阻塞队列。在 Fork Join 中用到,每个工作线程都有自己的任务队列,比生产者消费者模型有更好的伸缩性。如果一个工作线程的任务队列消费完了,可以窃取其他任务队列尾部的任务,进一步减少竞争

评论