Java/J.U.C(java.util.concurrent)包的梳理
java.util.concurrent
提供了并发编程的解决方案,主要包括两大块: - CAS:是java.util.concurrent.atomic
包的基础 - AQS:是java.util.concurrent.locks
包以及一些常用类(比如 Semophore、ReentrantLock)的基础
J.U.C 包的分类: - 线程执行器:executor - 锁:locks - 原子变量类:atomic - 并发工具类:tools - 并发集合:collections
并发工具类
- CountDownLatch:让主线程等待一组事件发生后继续执行。主线程设置一个 CountDownLatch,然后主线程调用 CountDownLatch 对象的 await() 方法等待计数器,子线程调用 CountDownLatch 对象的 countDown() 方法将计数器减1。直到变为 0,主线程才继续执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatchDemo demo = new CountDownLatchDemo();
demo.go();
}
private void go() throws InterruptedException {
//设置一个 CountDownLatch,计数器的值为 3
CountDownLatch latch = new CountDownLatch(3);
//依次创建 3 个线程,并启动
new Thread(new Task(latch), "Thread1").start();
Thread.sleep(1000);
new Thread(new Task(latch), "Thread2").start();
Thread.sleep(1000);
new Thread(new Task(latch), "Thread3").start();
latch.await();//等待子线程调用 countDown() 方法,直到计数器为0
System.out.println("所有线程已到达,主线程开始执行:" + Thread.currentThread().getName());
}
class Task implements Runnable {
private CountDownLatch latch;
public Task(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
//每调用一次 countDown(),CountDownLatch 中计数器的值就会减1
latch.countDown();
}
}
} - CyclicBarrier:阻塞自己当前线程,等待其他线程,所有线程必须同时到达后,当前线程才能继续执行。所有线程到达后,可以触发执行另一个预先设置的线程。和 CountDownLatch 的区别是:在计数器不为 0 时,CountDownLatch 不会阻塞子线程;而 CyclicBarrier 会阻塞所有线程,直到计数器为 0,所有线程才继续执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public class CyclicBarrierDemo {
public static void main(String[] args) throws InterruptedException {
CyclicBarrierDemo demo = new CyclicBarrierDemo();
demo.go();
}
private void go() throws InterruptedException {
//设置一个 CyclicBarrier,计数器的值为 3
CyclicBarrier barrier = new CyclicBarrier(3);
//依次创建 3 个线程,并启动
new Thread(new Task(barrier), "Thread1").start();
Thread.sleep(1000);
new Thread(new Task(barrier), "Thread2").start();
Thread.sleep(1000);
new Thread(new Task(barrier), "Thread3").start();
}
class Task implements Runnable {
private CyclicBarrier barrier;
public Task(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
System.out.println("线程: " + Thread.currentThread().getName() + "已经到达");
try {
//计数器减1,并阻塞,直到计数器为 0,才继续往下执行
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程: " + Thread.currentThread().getName() + "开始处理");
}
}
} - Semaphore:控制某个资源可同时被访问的线程数量。通过 acquire() 方法访问资源,如果信号量满了就阻塞,直到可访问。使用 release() 释放资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public class SemaphoreDemo {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
//设置信号量,最多只能 5 个线程同时访问
Semaphore semaphore = new Semaphore(5);
//模拟 20 个客户端同时访问
for (int i = 0; i < 20; i++) {
final int NUMBER = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Accessing:" + NUMBER);
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
};
service.execute(runnable);
}
//关闭线程池
service.shutdown();
}
} - Exchanger:两个线程到达同步点后,相互交换数据 # 并发集合:collections
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public class ExchangerDemo {
static Exchanger<String> exchanger = new Exchanger();
public static void main(String[] args) {
//代表男生和女生
ExecutorService service = Executors.newFixedThreadPool(2);
service.execute(() -> {
//女生对男生说的话
try {
String girl = exchanger.exchange("我其实暗恋你很久了....");
System.out.println("女生说:" + girl);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
service.execute(() -> {
try {
System.out.println("女生慢慢地从教室里走出来...");
TimeUnit.SECONDS.sleep(2);
//男生对女生说的话
String boy = exchanger.exchange("我很喜欢你");
System.out.println("男生说:" + boy);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
} - BlockingQueue:提供可阻塞的入队和出队操作。主要用于生产者和消费者模式,在多线程场景下生产者在队尾添加元素,而消费者线程则在队头消费元素,通过这种方式能够隔离任务的生产和消费。有 7 个实现类
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。先进先出,初始化容量后就不可改变
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列或者无界阻塞队列。先进先出,如果初始化时不指定大小,则默认大小为 Integer.MAX_VALUE,可以认为是无界的。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
- DelayQueue:一个支持延迟获取元素的无界阻塞队列
- SynchronousQueue:仅允许容纳一个元素。当一个线程添加元素后会被阻塞,直到这个元素被另一个线程消费掉
- LinkedTransferQueue:一个由链表组成的无界阻塞队列。相当于 SynchronousQueue + LinkedBlockingQueue。性能比 LinkedBlockingQueue 高,比 SynchronousQueue 能存储更多的元素
- LinkedBlockingDQueue:一个由链表组成的双向阻塞队列。在 Fork Join 中用到,每个工作线程都有自己的任务队列,比生产者消费者模型有更好的伸缩性。如果一个工作线程的任务队列消费完了,可以窃取其他任务队列尾部的任务,进一步减少竞争