Java并发高级设施实现原理笔记

vscode看java api的源码非常舒适 提供了跳转到指定源码的功能

结合Oracle的JAVA文档食用更佳

Java相关api介绍网站推荐: Baeldung

CountDownLatch

核心是sync,Sync类是基于AbstractQueueSynchonizer这一抽象接口实现的

AbstractQueueSynchronizer

一个用链表实现的队列,对节点加自旋锁来实现synchronize。按照队列的性质,头部的节点会先获得申请锁的机会,但是不保证成功。

有state来存储状态,其中state是volatile的,即存储在内存中,立即更新,不存在写缓冲问题。

这里的加锁方式有exclusive/shared两种。实现上没有做显著区分

Sync

实现了上述接口,其中Count直接存储在了AbstractQueueSynchronizer(以下简称AQS)的state变量中。

CountDownLatch的countDown方法是调用sync的releaseShared方法实现的 而sync中又调用AQS的tryReleaseShared() 判断能否释放,如果能,就调用doReleaseShared()对队列里的节点做操作。具体会调整列表中节点的状态。

CountDownLatch的await方法是调用sync的tryAcquireSharedNanos方法实现的,其中有设置限时(这里以纳秒为单位)。除了返回之外还会抛出异常。sync的方法调用了AQS的tryAcquireShared ,doAcquireSharedNanos方法 try方法有三种状态,不成功(返回负数),shared mode成功但subsequent shared-mode acquisition不成功(返回0,这里理解成时序上接下来共享状态下的申请不成功),shared mode和subsequent shared-mode acquisition都成功

CyclicBarrier

cyclic表达了可以重复利用。

1
public CyclicBarrier(int parties,Runnable barrierAction)

parties代表参与的线程的数量,通过await登记到达了共同的阻塞状态(目的地)并进入阻塞,只有当参与的所有线程都调用await进入阻塞的时候,所有进程才能同时进入barrierAction所对应的操作。注意,和CountDownLatch不同的是,这里并没有新开一个总结的线程(类型是Runnable,是任务而非Thread线程),而是把最后一个进入阻塞状态的线程用来执行barrierAction

Generation

实现重用依赖于Generation这个类。线程会阻塞,停止,因此用Generation来装载可以实现重用。Generation的实例被分配给哪一个线程是不确定的,而且可以有多个被分配给使用CyclicBarrier的线程,但是可以确定的是,同一时间只能有一个Generation是活跃中的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public class CyclicBarrier {
private static class Generation {
boolean broken = false;
}

/** The lock for guarding barrier entry */
//是之前提及的可重用锁
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
//Condition即java分配给每个对象的monitor相关数据结构,有notify notifyAll wait等方法
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
//...

/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();//condition中的方法,把所有线程都唤醒
// set up next generation
count = parties;
generation = new Generation();
}

/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

//可能抛出中断、超时 阻塞中断异常
private int dowait(boolean timed,long nanos) throws InterruptedException,BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
//引用 浅拷贝
if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

int index = --count;//记录有线程到达目的地
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;//最后一个到达的线程执行方法
if (command != null)
command.run();
ranAction = true;
nextGeneration();//这一批结束了 进行更新 把状态重设
return 0;
} finally {
if (!ranAction)
breakBarrier();//没有要做的最终操作也要把这一代结束
}
}

// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();//condition的方法 让当前线程等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();//被打断了 而且当前一代没作废 主动作废
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();//如果出现意外上面的条件没执行 也要主动打断
}
}

if (g.broken)
throw new BrokenBarrierException();

if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

}

ScheduledThreadPoolExecutor

1
2
3
4
5
6
7
8
extends ThreadPoolExecutor //线程池 池化线程对应的资源 避免反复生成和销毁带来的代价
implements ScheduledExecutorService// extends ExecutorService 可以调度命令使得经历一段延迟之后再执行,或者阶段性的执行

//case:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.schedule(() -> {
System.out.println("Hello World");
}, 500, TimeUnit.MILLISECONDS);

线程池系列介绍

enable:激活 execute:执行

激活不一定执行,按照激活的顺序排队(FIFO)来被调度去提交执行

image-20211030113256401

Semaphore

permits:允许调用的资源,如果没有permit,acquire方法会阻塞;release方法释放permit资源。Semaphore通常用来限制进入临界区的线程数量。lock可以认为是数量为1的semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//case
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

public Object getItem() throws InterruptedException {
available.acquire();//申请
return getNextAvailableItem();
}

public void putItem(Object x) {
if (markAsUnused(x))
available.release();//释放
}

// Not a particularly efficient data structure; just for demo

protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];

protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}

protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}

内部实现——Sync extends AbstractQueuedSynchronizer

利用了AQS,与CountDownLatch类似,permit的数量被state记录

acquire方法调用了Sync的acquireSharedInterruptibly,实际调用了AQS的tryAcquireShared和doAcquireSharedInterruptibly,与CountDownLatch类似。

release方法调用了Sync的releaseShared,实际调用了AQS的tryReleaseShared和doReleaseShared

综合来看,在涉及数量的同步上,都利用了AQS 这里信号量的逻辑和AQS的逻辑基本一致,申请的到就继续,申请不到即阻塞

Exchanger

“Synchronization point” 说明可以认为这里的exchanger操作是一个可以视为原子操作 交换指定的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class FillAndEmpty {
Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
DataBuffer initialEmptyBuffer = ... a made-up type
DataBuffer initialFullBuffer = ...

class FillingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialEmptyBuffer;
try {
while (currentBuffer != null) {
addToBuffer(currentBuffer);
if (currentBuffer.isFull())
currentBuffer = exchanger.exchange(currentBuffer);//在这里交换
}
} catch (InterruptedException ex) { ... handle ... }
}
}

class EmptyingLoop implements Runnable {
public void run() {
DataBuffer currentBuffer = initialFullBuffer;
try {
while (currentBuffer != null) {
takeFromBuffer(currentBuffer);
if (currentBuffer.isEmpty())
currentBuffer = exchanger.exchange(currentBuffer);//在这里交换
}
} catch (InterruptedException ex) { ... handle ...}
}
}

void start() {
new Thread(new FillingLoop()).start();
new Thread(new EmptyingLoop()).start();
}
}

sun.misc.Unsafe U; compareAndSwapObject来实现原子操作

可以用来实现管道之类进程间通信的机制(?)

DelayQueue

1
2
3
public class DelayQueue<E extends Delayed>
extends AbstractQueue<E>
implements BlockingQueue<E> //有四种处理不能立刻满足的操作的方式

不限长度的延迟元素的队列,其中元素只有超时之后才能取出。队列头的元素是超时最久的元素

其中容纳的元素需要继承自Delayed类,还要重载compareTo以便进行比较

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class DelayObject implements Delayed {
private String data;
private long startTime;

public DelayObject(String data, long delayInMilliseconds) {
this.data = data;
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
}

@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(Delayed o) {
return Ints.saturatedCast(
this.startTime - ((DelayObject) o).startTime);
}
}

例子:超时的消费者和生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class DelayQueueProducer implements Runnable {

private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToProduce;
private Integer delayOfEachProducedMessageMilliseconds;

// standard constructor

@Override
public void run() {
for (int i = 0; i < numberOfElementsToProduce; i++) {
DelayObject object
= new DelayObject(
UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
System.out.println("Put object: " + object);
try {
queue.put(object);
Thread.sleep(500);
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}

public class DelayQueueConsumer implements Runnable {
private BlockingQueue<DelayObject> queue;
private Integer numberOfElementsToTake;
public AtomicInteger numberOfConsumedElements = new AtomicInteger();

// standard constructors

@Override
public void run() {
for (int i = 0; i < numberOfElementsToTake; i++) {
try {
DelayObject object = queue.take();
numberOfConsumedElements.incrementAndGet();
System.out.println("Consumer take: " + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}


@Test
public void givenDelayQueue_whenProduceElement
_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
// given
ExecutorService executor = Executors.newFixedThreadPool(2);

BlockingQueue<DelayObject> queue = new DelayQueue<>();
int numberOfElementsToProduce = 2;
int delayOfEachProducedMessageMilliseconds = 500;
DelayQueueConsumer consumer = new DelayQueueConsumer(
queue, numberOfElementsToProduce);
DelayQueueProducer producer = new DelayQueueProducer(
queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);

// when
executor.submit(producer);
executor.submit(consumer);

// then
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();

assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
}