多处理编程的艺术
这本书我估计得看两遍,第一遍看算法/理论,第二遍看一遍证明。
锁/同步常用检测思维:
- 如何保证互斥?
- 如何保证公平性?简单来说,锁的先后顺序保证就是公平
- 如何保证不会饿死?
- 如何保证不会出现唤醒丢失/虚假唤醒?如果检测是否需要睡眠,和正式进入睡眠不是一步,一般就需要防备唤醒丢失。防备唤醒丢失的方法还是,使用锁来保证“同步”的线程不会出现同时执行交互的操作(同步的概念看第十章的开头)。虚假唤醒是指当醒过来的时候发现条件不满足(阅读第八章),因此需要多次检测条件(条件变量)是否满足
锁/同步设计常用思路:
- 对于相同(原子)变量的操作常常是导致性能消耗的关键,因此想办法将同样一个(原子)变量,拆分为属于不同部分的操作,见第十章对于有限同步队列最后 的点评与操作。除此之外,对同一事物的竞争是导致性能消耗的关键第十章的双数数据结构提供了一种保护操作。
- 除了对相同(原子)变量的操作导致性能消耗,另一方面,强行要求指令线性执行,而不是并发执行也会导致性能不高,对于这种问题:
- 具体见例子第十一章的11.3节:之所以无锁栈性能不好,不是因为top产生了竞争,而是因为强制要求线性并发,不能多个同时burst,这种加个缓冲垫的想法是非常常见且有效的。一个更具体的例子是:
- 这两条具体的理念看第十二章的12.2节,再看看合并树的操作。总而言之,尽量避免memory contention—多个线程同时访问相同的内存地址和尽量实现real parallelism,
- 无锁操作实现的关键往往是有每个前提保证的,就以第十章的无锁队列实现为例,每一步都要求保证当前操作的节点是最后的节点!设计无锁的数据结构建议看第十章无锁队列
第一章 introduction
第二章 互斥锁
2.1-2.5 互斥锁的几个例子
两种简单的锁,这两种锁各有各的缺点,但是相互补足了对面:一种锁的驱动机制会导致另一种锁卡住
pertson锁,结合了两种锁的优点
filter锁,适合多个线程,结合了多个线程的特点,有两个可以保证:
- 总会有某个线程进入第
l
层成功。 - 倘若有多个线程想进入第
l
层,那么至少会有一个线程被阻塞。
1 class Filter implements Lock {
2 int[] level;
3 int[] victim;
4 public Filter(int n) {
5 level = new int[n];
6 victim = new int[n]; // use 1..n-1
7 for (int i = 0; i < n; i++) {
8 level[i] = 0;
9 }
10 }
11 public void lock() {
12 int me = ThreadID.get();
13 for (int i = 1; i < n; i++) { // attempt to enter level i
14 level[me] = i;
15 victim[i] = me;
16 // spin while conflicts exist
17 while ((∃k != me) (level[k] >= i && victim[i] == me)) {};
18 }
19 }
20 public void unlock() {
21 int me = ThreadID.get();
22 level[me] = 0;
23 }
24 }
2.6-2.8 贝克算法和label有序性的理解
公正性,将锁分成doorway section核waiting section。
兰博贝克算法,贝克算法要求后来的线程拿到的label必须是更大的。
1 class Bakery implements Lock {
2 boolean[] flag;
3 Label[] label;
4 public Bakery (int n) {
5 flag = new boolean[n];
6 label = new Label[n];
7 for (int i = 0; i < n; i++) {
8 flag[i] = false; label[i] = 0;
9 }
10 }
11 public void lock() {
12 int i = ThreadID.get();
13 flag[i] = true;
14 label[i] = max(label[0], ...,label[n-1]) + 1;
15 while ((∃k != i)(flag[k] && (label[k],k) << (label[i],i))) {};
16 }
17 public void unlock() {
18 flag[ThreadID.get()] = false;
19 }
20 }
实际上就是个单调的线性
(label[i], i) << (label[j], j))
if and only if
label[i] < label[j] or label[i] = label[j] and i < j.
接下来提出了有向图的概念,a->b的意思是a晚于b发生。对于两个线程,我们只需要设置三个token的图,如图2.13s所示,既能保证他们是有序的。为了能够解决多线程的复杂情况,使用有向图来表示,提出下面的说法,实际上H也是图:
- We say that A dominates B in G if every node of A has edges directed to every node of B.
- G ◦ H 就是graph multiplication ,Replace every node v of G by a copy of H (denoted Hv), and let Hv dominate Hu in G ◦ H if v dominates u in G. 这里所谓的图的替代,是指直接全替代代表一个节点?
对于label算法的理解:理解n个线程的label算法的关键是token代表的节点从来都是有序构成一个环,而不会出现不知道label相互竞争的情况。对两个线程而言,之所以两个线程不会出现label的竞争是因为T2的最短路径里要求三个节点。这部分建议直接看中文第一版的《多处理器编程的艺术》。这部分没看的特别明白需要重看。
下面是一些要注意到的名词:
- Directed graph 有向图 Precedence graph 前趋图
- irreflexive 自反关系:对自己是不成立的 antisymmetric 反对称关系 transitive 传递关系
- trivial 容易得,不费力得
- safety properties 不可进入非法状态 liveness properties运行状态中会达到的糟糕状态,只能找维基百科的说法 safety properties informally require that “something bad will never happen”
2.9 基于读/写操作的锁的不足
- Is there a clever Lock algorithm based on reading and writing memory that avoids this overhead? We now demonstrate that the answer is no. 为什么不能呢?因为如果多个线程同时读写,那么他们的结果可能在看到之前就被别的线程改写了。所以必须需要N个地方,换言之每个线程只能改自己的
- A Lock object state s is inconsistent in any global state where some thread is in the critical section, but the lock state is compatible with a global state in which no thread is in the critical section or is trying to enter the critical section. 这里实际上实说,互斥的锁必然是状态一致的,不一致会导致互斥失败
第三章 并发编程
这一章是帮助理解什么是顺序一致性的,但是感觉读起来很蛋疼。这里面实际上有一个问题,怎么判断一个数据结构是不是满足线程安全呢?
3.0-3.3一个反直觉但正确的顺序一致性
sequential consistency 顺序一致性,这个由下面两个定理定义。我们当然想要求real time order,但这里的sequential consistency并不保持real-time order,它并不要求要求不同线程之间存在的一种顺序,使得一个fifo的dequeue和enqueue是正确的。换言之,这种顺序一致性只是保证了一个线程内的程序调用顺序和代码顺序是一致的,这并不违背我们按照《A primer xxx》给出的顺序一致性。建议直接看这个问题:https://stackoverflow.com/questions/19209274/example-of-execution-which-is-sequentially-consistent-but-not-quiescently-consis
- Principle 3.3.1. Method calls should appear to happen in a one-at-a-time, sequential order. 这个只要求一个接一个.
-
Principle 3.3.2. Method calls should appear to take effect in program order. 程序的调用顺序应该和代码的执行顺序是一致的,而且应当符合我们的预期。
- 顺序一致性并不是阻塞的,这句话有点令人感觉蛋疼,和工业也不相符。
- 这里引入了一个复合的概念,每个个体满足p,如果全体也满足p那么就说p这个条件是复合的。但是对于SC则不满足这一点具体的例子见P57,这东西违背了顺序一致性,因为它自己构成了一个循环
3.4-3.5 线性一致性和静态一致性
这个东西就比刚才那个更符合人类的想法了。
- Principle 3.4.1. Each method call should appear to take effect instantaneously at some moment between its invocation and response. 就是可以将真正执行的时候,看成一个时刻,这个时刻在call发生和response返回之间。这就是线性一致性
- Principle 3.5.1. Method calls separated by a period of quiescence should appear to take effect in their real-time order. 严格按照发生时刻的顺序执行,这个就是静态一致性
3.6 线性一致性的正式定义
非常恶心,非常非常恶心,不说人话的典型,锁的理论表达,包括下面几章等以后再看吧。
第七章 自旋锁和竞争
这章实际上需要理解的就是锁的有序性,我们只要保证锁的获取是有序的,不会出现多个进入,那么就满足了互斥。然后只要不断将等待时间乘以二,那么就可以减少同时的争用。先进先出属于OPTIONAL的选项。
7.1-7.4 几种不同的锁
TAS锁,TTAS锁,回退锁,为什么性能不一致?实际上是因为TAS锁竞争的时候对于缓存要求太高,经常要求使得其他的缓存无效,TTAS只要求GetS操作,不会强行同步总线和缓存。回退锁是减少了竞争的发生,实际上就是让出来临界区。实际上这东西提供了我们三个东西:
- 代码层面我们认为逻辑一致性的东西,已经在一定程度上避免了问题的出现,但是这种层面只是问题设计的第一层面
- 因为代码是需要跑的,所以在实现的时候不但要考虑上层层面,还要考虑底层层面的实现,对锁而言就是缓存协议,同步等的代价
- 最后一个就是要养成一个从上向下,看到底层的习惯了。还得尝试提出,实现方面的抽象。
TAS锁:
1 public class TASLock implements Lock {
2 AtomicBoolean state = new AtomicBoolean(false);
3 public void lock() {
4 while (state.getAndSet(true)) {}
5 }
6 public void unlock() {
7 state.set(false);
8 }
9 }
TTAS锁:
1 public class TTASLock implements Lock {
2 AtomicBoolean state = new AtomicBoolean(false);
3 public void lock() {
4 while (true) {
5 while (state.get()) {};
6 if (!state.getAndSet(true))
7 return;
8 }
9 }
10 public void unlock() {
11 state.set(false);
12 }
13 }
回退锁:
1 public class Backoff {
2 final int minDelay, maxDelay;
3 int limit;
4 public Backoff(int min, int max) {
5 minDelay = min;
6 maxDelay = max;
7 limit = minDelay;
8 }
9 public void backoff() throws InterruptedException {
10 int delay = ThreadLocalRandom.current().nextInt(limit);
11 limit = Math.min(maxDelay, 2 * limit);
12 Thread.sleep(delay);
13 }
14 }
1 public class BackoffLock implements Lock {
2 private AtomicBoolean state = new AtomicBoolean(false);
3 private static final int MIN_DELAY = ...;
4 private static final int MAX_DELAY = ...;
5 public void lock() {
6 Backoff backoff = new Backoff(MIN_DELAY, MAX_DELAY);
7 while (true) {
8 while (state.get()) {};
9 if (!state.getAndSet(true)) {
10 return;
11 } else {
12 backoff.backoff();
13 }
14 }
15 }
16 public void unlock() {
17 state.set(false);
18 }
19 ...
20 }
7.5 基于队列的锁
7.5.1 基于数组的锁
基于数组的队列锁,tail是原子变量,保证每个线程都不会取到一样的值。每个flag[slot]为true代表锁可用,而flag[slot]为false代表不可用。因为使用了共享的一个数组,让多个线程不会在同一个地方产生竞争,因为可以提供竞争较少的服务。但是如果发生了高速缓存伪共享,(也就是说,不同内存块的内容被缓存到同一块缓存行上,不同的CPU对同一块缓存行的独写,注意这个不是同一个地址的独写,会导致高速缓存频繁的换入换出)同样会导致竞争激烈。因此如果把指针转动的幅度变大,就可以有效避免高速缓存伪共享的问题。
这东西我感觉像是一个表盘,转到谁就代表谁可以写数据,这个锁的传递是释放的时候传递的。但是有个问题如果最后又重新转到了头部,那么就会出现同时有两个线程获得锁了,这很糟糕。下面还是从三个层面来考虑:
- FIFO,是的,这个锁是FIFO的
- 会饿死吗?不会,只要能排上号的线程,最后都是会转到的
1 public class ALock implements Lock {
2 ThreadLocal<Integer> mySlotIndex = new ThreadLocal<Integer> (){
3 protected Integer initialValue() {
4 return 0;
5 }
6 };
7 AtomicInteger tail;
8 volatile boolean[] flag;
9 int size;
10 public ALock(int capacity) {
11 size = capacity;
12 tail = new AtomicInteger(0);
13 flag = new boolean[capacity];
14 flag[0] = true;
15 }
16 public void lock() {
17 int slot = tail.getAndIncrement() % size;
18 mySlotIndex.set(slot);
19 while (!flag[slot]) {};
20 }
21 public void unlock() {
22 int slot = mySlotIndex.get();
23 flag[slot] = false;
24 flag[(slot + 1) % size] = true;
25 }
26 }
7.5.2 基于CLH队列的锁
这个不如叫做链表锁吧,这个东西的有序是用链表的getAndSet保证的,因为要求锁的传递。这个东西对NUMA不友好,因为每个线程判断是不是可以上锁的时候都是要看前面的缓存块是不是锁住的。我们看看这个锁提供的特性
- 饿死,不会饿死
- FIFO,确实FIFO
1 public class CLHLock implements Lock {
2 AtomicReference<QNode> tail;
3 ThreadLocal<QNode> myPred;
4 ThreadLocal<QNode> myNode;
5 public CLHLock() {
6 tail = new AtomicReference<QNode>(new QNode());
7 myNode = new ThreadLocal<QNode>() {
8 protected QNode initialValue() {
9 return new QNode();
10 }
11 };
12 myPred = new ThreadLocal<QNode>() {
13 protected QNode initialValue() {
14 return null;
15 }
16 };
17 }
18 public void lock() {
19 QNode qnode = myNode.get();
20 qnode.locked = true;
21 QNode pred = tail.getAndSet(qnode);
22 myPred.set(pred);
23 while (pred.locked) {}
24 }
25 public void unlock() {
26 QNode qnode = myNode.get();
27 qnode.locked = false;
28 myNode.set(myPred.get());
29 }
30 class QNode {
31 volatile boolean locked = false;
32 }
33 }
7.5.3 MCS队列锁
这个锁不一样的地方是,每个线程看是否执行都是看自己控制的对象,所以每个对象都必须等待前面的对象释放掉自己。由于只有一个tail是可以原子CompareAndSet的,所以在unlokc的时候需要用tail这个fast path来等待slow path也就是后面的节点和前面节点挂起来的操作。这个锁和CLH锁相比,优点相同,但是因为检查的是本地的内存,所以对NUMA更友好。
1 public class MCSLock implements Lock {
2 AtomicReference<QNode> tail;
3 ThreadLocal<QNode> myNode;
4 public MCSLock() {
5 tail = new AtomicReference<QNode>(null);
6 myNode = new ThreadLocal<QNode>() {
7 protected QNode initialValue() {
8 return new QNode();
9 }
10 };
11 }
12 public void lock() {
13 QNode qnode = myNode.get();
14 QNode pred = tail.getAndSet(qnode);
15 if (pred != null) {
16 qnode.locked = true;
17 pred.next = qnode;
18 // wait until predecessor gives up the lock
19 while (qnode.locked) {}
20 }
21 }
22 public void unlock() {
23 QNode qnode = myNode.get();
24 if (qnode.next == null) {
25 if (tail.compareAndSet(qnode, null))
26 return;
27 // wait until successor fills in its next field
28 while (qnode.next == null) {}
29 }
30 qnode.next.locked = false;
31 qnode.next = null;
32 }
33 class QNode {
34 volatile boolean locked = false;
35 volatile QNode next = null;
36 }
37 }
7.6 实现了超时功能的基于队列的锁
对于回退锁等简单锁,超时很简单,几步以后直接返回。而对于队列锁,返回则很难,不能直接把自己的锁标记为成功,因此采用一种lazy的做法,标记为abandoned,如果被抛弃的锁发现前面这货是被丢弃的,那么直接去检查之前的锁。下面的实现里,如果某个QNode的pred字段为NULL,那么要么是还没获取到锁,在等待获得锁,要么是还没获得锁,打释放掉锁。如果QNode的pred字段指向QNode AVAILABLE,那么这个QNode之前关联的线程已经释放,可以获取这个锁。如果pred字段指向其他节点,非NULL,也不是AVAILABLE,说明前面的QNode关联的线程放弃了锁请求,我们直接占有这个QNode去做检查。
1 public class TOLock implements Lock{
2 static QNode AVAILABLE = new QNode();
3 AtomicReference<QNode> tail;
4 ThreadLocal<QNode> myNode;
5 public TOLock() {
6 tail = new AtomicReference<QNode>(null);
7 myNode = new ThreadLocal<QNode>() {
8 protected QNode initialValue() {
9 return new QNode();
10 }
11 };
12 }
13 ...
14 static class QNode {
15 public volatile QNode pred = null;
16 }
17 }
18 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
19 long startTime = System.currentTimeMillis();
20 long patience = TimeUnit.MILLISECONDS.convert(time, unit);
21 QNode qnode = new QNode();
22 myNode.set(qnode);
23 qnode.pred = null;
24 QNode myPred = tail.getAndSet(qnode);
25 if (myPred == null || myPred.pred == AVAILABLE) {
26 return true;
27 }
28 while (System.currentTimeMillis() - startTime < patience) {
29 QNode predPred = myPred.pred;
30 if (predPred == AVAILABLE) {
31 return true;
32 } else if (predPred != null) {
33 myPred = predPred;
34 }
35 }
36 if (!tail.compareAndSet(qnode, myPred))
37 qnode.pred = myPred;
38 return false;
39 }
40 public void unlock() {
41 QNode qnode = myNode.get();
42 if (!tail.compareAndSet(qnode, null))
43 qnode.pred = AVAILABLE;
44 }
7.7 层次锁
跨cluster锁的解决方式,不外乎锁队列。请求同类(cluster)队列的锁被称为队列锁。每个不同的cluster请求一个大的共享的锁,每次线程申请锁的时候,先锁cluster内部的锁,在请不同cluster共享的锁。释放的时候先检查本cluster内部有没有锁等着,有的话先释放自己cluster的锁,然后让自己cluster的锁跑着,等到确定没有了再释放大共享锁。
7.8 复合锁
一种结合了好理解,而且要求常量空间的锁。实际上就是等待队列前面的锁该等就死等着把,等待队列后面的锁你们按照指数级别增长等待事件,嗯,既减少了竞争,又让前面的线程忙等。这种原地的写法实际上在我看来,糟糕透了,炫技意味浓厚。
acquireQNode
就是尝试进入队列spliceQNode
则尝试waitForPredecessor
尝试获取锁,等待一段时间
1 public class CompositeLock implements Lock{
2 private static final int SIZE = ...;
3 private static final int MIN_BACKOFF = ...;
4 private static final int MAX_BACKOFF = ...;
5 AtomicStampedReference<QNode> tail;
6 QNode[] waiting;
7 ThreadLocal<QNode> myNode = new ThreadLocal<QNode>() {
8 protected QNode initialValue() { return null; };
9 };
10 public CompositeLock() {
11 tail = new AtomicStampedReference<QNode>(null,0);
12 waiting = new QNode[SIZE];
13 for (int i = 0; i < waiting.length; i++) {
14 waiting[i] = new QNode();
15 }
16 }
17 public void unlock() {
18 QNode acqNode = myNode.get();
19 acqNode.state.set(State.RELEASED);
20 myNode.set(null);
21 }
22 ...
23 }
24 enum State {FREE, WAITING, RELEASED, ABORTED};
25 class QNode {
26 AtomicReference<State> state;
27 QNode pred;
28 public QNode() {
29 state = new AtomicReference<State>(State.FREE);
30 }
31 }
32 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
33 long patience = TimeUnit.MILLISECONDS.convert(time, unit);
34 long startTime = System.currentTimeMillis();
35 Backoff backoff = new Backoff(MIN_BACKOFF, MAX_BACKOFF);
36 try {
37 QNode node = acquireQNode(backoff, startTime, patience);
38 QNode pred = spliceQNode(node, startTime, patience);
39 waitForPredecessor(pred, node, startTime, patience);
40 return true;
41 } catch (TimeoutException e) {
42 return false;
43 }
44 }
45 private QNode acquireQNode(Backoff backoff, long startTime, long patience)
46 throws TimeoutException, InterruptedException {
47 QNode node = waiting[ThreadLocalRandom.current().nextInt(SIZE)];
48 QNode currTail;
49 int[] currStamp = {0};
50 while (true) {
51 if (node.state.compareAndSet(State.FREE, State.WAITING)) {
52 return node;
53 }
54 currTail = tail.get(currStamp);
55 State state = node.state.get();
56 if (state == State.ABORTED || state == State.RELEASED) {
57 if (node == currTail) {
58 QNode myPred = null;
59 if (state == State.ABORTED) {
60 myPred = node.pred;
61 }
62 if (tail.compareAndSet(currTail, myPred, currStamp[0], currStamp[0]+1)) {
63 node.state.set(State.WAITING);
64 return node;
65 }
66 }
67 }
68 backoff.backoff();
69 if (timeout(patience, startTime)) {
70 throw new TimeoutException();
71 }
72 }
73 }
74 private QNode spliceQNode(QNode node, long startTime, long patience)
75 throws TimeoutException {
76 QNode currTail;
77 int[] currStamp = {0};
78 do {
79 currTail = tail.get(currStamp);
80 if (timeout(startTime, patience)) {
81 node.state.set(State.FREE);
82 throw new TimeoutException();
83 }
84 } while (!tail.compareAndSet(currTail, node, currStamp[0], currStamp[0]+1));
85 return currTail;
86 }
87 private void waitForPredecessor(QNode pred, QNode node,
88 long startTime, long patience)
89 throws TimeoutException {
90 int[] stamp = {0};
91 if (pred == null) {
92 myNode.set(node);
93 return;
94 }
95 State predState = pred.state.get();
96 while (predState != State.RELEASED) {
97 if (predState == State.ABORTED) {
98 QNode temp = pred;
99 pred = pred.pred;
100 temp.state.set(State.FREE);
101 }
102 if (timeout(patience, startTime)) {
103 node.pred = pred;
104 node.state.set(State.ABORTED);
105 throw new TimeoutException();
106 }
107 predState = pred.state.get();
108 }
109 pred.state.set(State.FREE);
110 myNode.set(node);
111 return;
112 }
7.9 快速锁?
第八章 条件变量
因为我们能让线程自旋保持锁的争用,所以如果是自旋锁,我们可以保证总会有线程抢到锁。但是如果做的是睡眠操作而且没有条件变量,那就蛋疼了,如果做互斥唤醒的话我们必须得有条件变量来帮忙我们执行,否则我们没办法唤醒某个具体的线程。得看看C11的内置条件变量是怎么回事。
所有的语言都有无法唤醒(lost wake up)问题,有两种解决无法唤醒问题的方法:
- Always signal all processes waiting on a condition, not just one.
- Specify a timeout when waiting
这里面有几个地方需要注意,一个是虚假唤醒(spurious wakeup),另一个是惊群。C++11现在并没有解决虚假唤醒的问题。目前很多人把惊群也当作虚拟唤醒的一种。如果只看维基百科,惊群也是虚假唤醒的一种,但是在早期的版本,会出现没发信号就唤醒的情况:
- 惊群:导致一个提前被唤醒的线程把资源给抢了,后面唤醒的线程发现条件不满足
- spurious wakeup 如果有惊群效应导致一个提前被唤醒的线程把资源给抢了,后面唤醒的线程发现条件不满足,这个也叫做虚假唤醒。也有可能是根本就没有生产者线程发信号,wait就直接返回了,这也是虚假唤醒。不同的平台提供不同的保证,pthread说是不会出现第二个情况。具体虚假唤醒的定义看维基百科https://en.wikipedia.org/wiki/Spurious_wakeup
8.3 读写锁
读写锁两个特性,实际上书里给的两种读写锁都是用的可冲入锁,什么意思,可以多次进入一个锁:
- No thread can acquire the write lock while any thread holds either the write lock or the read lock.
- No thread can acquire the read lock while any thread holds the write lock.
光看这两个条件,有没有这种感觉,因为写锁获取之前必须所有的读锁/写锁都释放掉,所以和上面对应起来有种
-
write.lock(): while(还有读锁/写锁活着) { condition.wait(); }
-
read.lock(): while(写锁活着) { condition.wait(); }
看看读写锁的代码实现,会发现可重入锁是关键,可重入锁保护的是谁?保护的实际上是write & readAcquires & redReleases对象,也就是锁内部的关键数据。但是我感觉这个实现没有避开多读者之间的竞争啊,还是有问题,当然也确实在一定程度上降低了读者的竞争。可重入锁的代码啥的看下面:
23 private class ReadLock implements Lock {
24 public void lock() {
25 lock.lock();
26 try {
27 while (writer)
28 condition.await();
29 readAcquires++;
30 } finally {
31 lock.unlock();
32 }
33 }
34 public void unlock() {
35 lock.lock();
36 try {
37 readReleases++;
38 if (readAcquires == readReleases)
39 condition.signalAll();
40 } finally {
41 lock.unlock();
42 }
43 }
44 }
45 private class WriteLock implements Lock {
46 public void lock() {
47 lock.lock();
48 try {
49 while (writer)
50 condition.await();
51 writer = true;
52 while (readAcquires != readReleases)
53 condition.await();
54 } finally {
55 lock.unlock();
56 }
57 }
58 public void unlock() {
59 lock.lock();
60 try {
61 writer = false;
62 condition.signalAll();
63 } finally {
64 lock.unlock();
65 }
66 }
67 }
8.4 可重入锁
没想到吧JOJO,可重入锁才是本章节的关键啊,哈哈哈,实在是没话可说,这本书真的是不讲人话的典型啊。
第九章 链表并发
这一章实际上是得意识到追求绝对的准确和减少竞争避免问题出现是很难一起实现的
9.1-9.3 一堆理念的废话
9.3这节的重点并不是,所谓的线性/链表实现blablabla之类的东西,而是从不变式到抽象不变式,再到属性检查的部分是它的精髓,从某种意义来说也是最大的败笔,我在看实现方面的理念,你给我扯一堆没用的东西,然后说了一堆抽象的废话,就忽然结束了。
我们知道缓存的不变式(invariant)是SWMR和DATA-VALUE,对抽象数据结构而言,不变式是什么?嗯,我目前也不知道,不过不变式应该满足下面这两条。这些不变式也被叫做freedom from interference:
- the property holds when the object is created, and
- once the property holds, no thread can take a step that makes it false.
类比到我们这个例子里面,我们需要看每个同步时候的指令到底干了些啥,比方说insert(),remove()。在通过方法研究这些属性的时候,要能区分开他们代表的抽象和具体实际代码。抽象不变式只对对象抽象有效果,实现的不变式要跟着具体实现变化。下面的每节就是看各种不同级别的实现。
9.4-9.5 各种粒度同步(锁)
细粒度锁为了能够保证先后的正确,因此必须,“手把手”的去获取锁。换言之:it acquires the lock for a node (except the head node) while holding (i.e., before releasing) the lock for its predecessor. 在获取某个节点锁之前要先获取它之前的锁。这个行为被称作lock coupling锁耦合(也叫做连接器)。这要求每个线程获取耦合锁的时候,具体获取耦合锁里面的一个锁的先后顺序要一致!
9.6 乐观同步(锁)
嗯,那个出租车司机的笑话挺形象的。感觉除了这个之外没啥值得看额。。。也不完全是,对链表的插入和查询都不需要锁,或者说,拿个读锁去操作,写锁做别的事情。之所以这个能这么干,有个很大的原因就是删除操作并不是把单元的内存释放了,而是把关联的关系去掉了。但是,被删除的节点的->next属性依然指向下个节点。也就是说即使你拿到的是无效了的指针,它依然能找到下个元素。
那么这里面乐观了什么?这里实际上就是假设当我进行删除/查找/插入操作的时候,不会有其他的进程让我已经获得的节点失效。我这里面得意识到:
- 先读的关系,后获取的锁,所以需要validate。
- 如果是先获取的锁,后读的关系,那就不需要validate了。这就是乐观同步和细粒度锁的区别。
具体乐观锁的优化就是懒同步,下面是乐观同步的代码,主要有个问题是它的访问需要加锁,那么怎么解决呢?。
1 public boolean add(T item) { //增加
2 int key = item.hashCode();
3 while (true) {
4 Node pred = head;
5 Node curr = pred.next;
6 while (curr.key < key) {
7 pred = curr; curr = curr.next;
8 }
9 pred.lock();
10 try {
11 curr.lock();
12 try {
13 if (validate(pred, curr)) {
14 if (curr.key == key) {
15 return false;
16 } else {
17 Node node = new Node(item);
18 node.next = curr;
19 pred.next = node;
20 return true;
21 }
22 }
23 } finally {
24 curr.unlock();
25 }
26 } finally {
27 pred.unlock();
28 }
29 }
30 }
31 public boolean remove(T item) {//这个是删除操作
32 int key = item.hashCode();
33 while (true) {
34 Node pred = head;
35 Node curr = pred.next;
36 while (curr.key < key) {
37 pred = curr; curr = curr.next;
38 }
39 pred.lock();
40 try {
41 curr.lock();
42 try {
43 if (validate(pred, curr)) {
44 if (curr.key == key) {
45 pred.next = curr.next;
46 return true;
47 } else {
48 return false;
49 }
50 }
51 } finally {
52 curr.unlock();
53 }
54 } finally {
55 pred.unlock();
56 }
57 }
58 }
9.7 懒同步(锁)
懒同步实际上就是给每个节点加个标签,来表示它是不是有效的。这个东西实际上相当于一大堆读锁,每次发现某个数据没用了就直接CompareAndSet(Valid=false)。如果需要添加的节点的key已经存在,先改变值,再改变valid属性。每次发现数量太多了,不够用了再唤醒写锁。实际上你猜我想起来了什么,我想起来了mem_sem这个东西,这也是lasy_synchronization同步。
当着这个东西有个问题,就是会导致执行真正删除的时候很麻烦,为了能够detach节点,需要检查现在的detach的节点是不是有效的,相比于乐观同步,懒同步在移除节点的时候先标记无效,再移除关联关系。但是因为有可能找到无效的指针,所以检索找不到的可能性存在(具体见pdf的P218)
33 public boolean contains(T item) {
34 int key = item.hashCode();
35 Node curr = head;
36 while (curr.key < key)
37 curr = curr.next;
38 return curr.key == key && !curr.marked;
39 }
9.8 无锁同步
dada~到了最后就是重头戏了,无锁同步。本质就是同时看两个条件是不是满足,实质还是锁
建议看看这个C++11的例子,很有趣https://stackoverflow.com/questions/40247249/what-is-the-c-11-atomic-library-equivalent-of-javas-atomicmarkablereferencet和具体的atomic比较的例子https://www.cnblogs.com/haippy/p/3301408.html这部分还是建议直接看原理吧。
下面的代码里,window负责找到之前的节点和当前的节点
1 class Window {
2 public Node pred, curr;
3 Window(Node myPred, Node myCurr) {
4 pred = myPred; curr = myCurr;
5 }
6 }
7 Window find(Node head, int key) {
8 Node pred = null, curr = null, succ = null;
9 boolean[] marked = {false};
10 boolean snip;
11 retry: while (true) {
12 pred = head;
13 curr = pred.next.getReference();
14 while (true) {
15 succ = curr.next.get(marked);
16 while (marked[0]) {
17 snip = pred.next.compareAndSet(curr, succ, false, false);
18 if (!snip) continue retry;
19 curr = succ;
20 succ = curr.next.get(marked);
21 }
22 if (curr.key >= key)
23 return new Window(pred, curr);
24 pred = curr;
25 curr = succ;
26 }
27 }
28 }
第十章:队列,内存管理和ABA问题
对pool(池)而言,操作分为部分和全部,部分是需要满足条件才能执行,如果条件不满足就阻塞的操作,全部是指立刻发生,或者返回错误码或者返回结果的操作。如果一个部分操作,需要另一个方法和它重叠才能发生,就说这个方法是同步的。
池提供了不同的公平性保证,如FIFO(队列),和LIFO(栈)。这章主要说队列。
10.1-10.3有限同步队列的实现
两个条件变量,两个锁,下面的代码很巧妙的避免了“唤醒丢失”问题,这里要注意,为什么发现为空的时候要拿到deqLock呢?因为有唤醒丢失的问题,为什么会有唤醒丢失?因为:
- 对于发布唤醒的线程而言(我们这里以enqueue为例),虽然size是原子变量,但是从对size的判断再到
mustWakeDequeuers = true;
中间是有一个过程的,我们必须保证deq线程不能在notEmptyCondition.signalAll();
之后进入睡眠,所以需要用deqLock进行保护。同理看deq算法 - 对于准备被唤醒的线程而言(我们这里以dequeue为例),虽然size是原子变量,但是从对size的判断再到
notFullCondition
条件变量的等待是有个过程的,因此需要让enqueue线程获得enqLock来保证先后性。
有限同步队列的问题是,每次都要操作两个锁deqLock和enqLock,同时enq和deq操作都要动size,因此将size拆分为enqSideSize 和deqSideSize 。队列的实际上都市enqSideSize+deqSideSize。enq操作检测enqSideSize大小,小于容量则继续。如果到达容量就锁住deqLock,把deqSideSize加到enqSideSize,然后重置deqSideSize为0
1 public class BoundedQueue<T> {
2 ReentrantLock enqLock, deqLock;
3 Condition notEmptyCondition, notFullCondition;
4 AtomicInteger size;
5 volatile Node head, tail;
6 final int capacity;
7 public BoundedQueue(int _capacity) {
8 capacity = _capacity;
9 head = new Node(null);
10 tail = head;
11 size = new AtomicInteger(0);
12 enqLock = new ReentrantLock();
13 notFullCondition = enqLock.newCondition();
14 deqLock = new ReentrantLock();
15 notEmptyCondition = deqLock.newCondition();
16 }
17 ...
18 }
19 protected class Node {
20 public T value;
21 public volatile Node next;
22 public Node(T x) {
23 value = x;
24 next = null;
25 }
26 }
27 public void enq(T x) {
28 boolean mustWakeDequeuers = false;
29 Node e = new Node(x);
30 enqLock.lock();
31 try {
32 while (size.get() == capacity)
33 notFullCondition.await();
34 tail.next = e;
35 tail = e;
36 if (size.getAndIncrement() == 0)
37 mustWakeDequeuers = true;
38 } finally {
39 enqLock.unlock();
40 }
41 if (mustWakeDequeuers) {
42 deqLock.lock();
43 try {
44 notEmptyCondition.signalAll();
45 } finally {
46 deqLock.unlock();
47 }
48 }
49 }
50 public T deq() {
51 T result;
52 boolean mustWakeEnqueuers = false;
53 deqLock.lock();
54 try {
55 while (head.next == null)
56 notEmptyCondition.await();
57 result = head.next.value;
58 head = head.next;
59 if (size.getAndDecrement() == capacity) {
60 mustWakeEnqueuers = true;
61 }
62 } finally {
63 deqLock.unlock();
64 }
65 if (mustWakeEnqueuers) {
66 enqLock.lock();
67 try {
68 notFullCondition.signalAll();
69 } finally {
70 enqLock.unlock();
71 }
72 }
73 return result;
74 }
10.4 无限全部队列
无限同步队列不会产生死锁,enq必然成功,deq最多抛出个异常,实现非常简单。
1 public void enq(T x) {
2 Node e = new Node(x);
3 enqLock.lock();
4 try {
5 tail.next = e;
6 tail = e;
7 } finally {
8 enqLock.unlock();
9 }
10 }
11 public T deq() throws EmptyException {
12 T result;
13 deqLock.lock();
14 try {
15 if (head.next == null) {
16 throw new EmptyException();
17 }
18 result = head.next.value;
19 head = head.next;
20 } finally {
21 deqLock.unlock();
22 }
23 return result;
24 }
10.5 无锁无限队列
先简单思考,对于无锁队列(以链表方式实现而言),问题在于:
- enq的时候,怎么确保添加到最后一个节点。换言之,我一开始看到的last和最后写入的last是一个吗?
- tail->next的变化和tail的变化是同一步吗?怎么保证两个都成功?换言之,怎么保证添加新节点和挪动tail指针是原子的呢?我们必然得先添加新节点,再挪动tail指针。
相对应的,我们针对这两个问题分析:
- enq的时候,如果last不是tail,说明最后节点失效。如果next不为NULL,说明最后节点有人添加,但是未必失效。所以在此CompareAndSet(tail)节点。我感觉没必要。。。。。实际上函数里面如果发现tail不是last节点,就重新获取last节点。
- 每次操作都是确保在当时last == tail的,然后尝试添加。如果发现next不是NULL,那么再次尝试重新获取tail。如果是NULL那么CompareAndSet,CAS成功了的话,更新tail,如果不成功说明有别的更新tail了,我们不用管。实际上还是通过CAS来保证添加新节点和挪动tail指针两步虽然是分割的,但是实际执行起来是原子的。每一步的检查都在确定当前的last节点是最后一个节点。
现在分析一下,该队列。更新tail节点是lazy的,enq方法的29-31行实际上帮助了tail的挪动。
1 public class LockFreeQueue<T> {
2 AtomicReference<Node> head, tail;
3 public LockFreeQueue() {
4 Node node = new Node(null);
5 head = new AtomicReference(node);
6 tail = new AtomicReference(node);
7 }
8 ...
9 }
10 public class Node {
11 public T value;
12 public AtomicReference<Node> next;
13 public Node(T value) {
14 this.value = value;
15 next = new AtomicReference<Node>(null);
16 }
17 }
18 public void enq(T value) {
19 Node node = new Node(value);
20 while (true) {
21 Node last = tail.get();
22 Node next = last.next.get();
23 if (last == tail.get()) {
24 if (next == null) {
25 if (last.next.compareAndSet(next, node)) {
26 tail.compareAndSet(last, node);
27 return;
28 }
29 } else {
30 tail.compareAndSet(last, next);
31 }
32 }
33 }
34 }
35 public T deq() throws EmptyException {
36 while (true) {
37 Node first = head.get();
38 Node last = tail.get();
39 Node next = first.next.get();
40 if (first == head.get()) {
41 if (first == last) {
42 if (next == null) {
43 throw new EmptyException();
44 }
45 tail.compareAndSet(last, next);
46 } else {
47 T value = next.value;
48 if (head.compareAndSet(first, next))
49 return value;
50 }
51 }
52 }
53 }
10.6 内存回收和ABA问题
本节主要说的是内存回收
- 每个线程维持自己的内存回收队列是最简单的实现,自己操作自己不会引起任何同步问题。
10.7 双数数据结构
这个实际上我没看,感觉用处不大。
第11章 栈和释放(?)
11.1-11.2 无锁栈
CompareAndSet的实现,没啥问题
11.3-11.4 消除(?)
之所以无锁栈性能不好,不是因为top产生了竞争,而是因为强制要求线性并发,不能多个同时burst。这里我们引入一个观察:一个push和一个pop会相互抵消,因此如果直接取消一对push和pop,就可以不对栈做操作。实际上这里是在数组里面添加了一个缓冲垫—一个缓冲数组。但是这个方法实际上并不再是后入后出了。而且只要对数组操作就有满不满吧?此外,我得说那个投票的笑话太牛逼了,一下子把出现的问题描述的非常清楚。
11.4.1 交换类型的实现
本质上是个缓冲垫,这个里面还有个问题就是,每个线程的随机数发生器必须不一样!否则都打到一个线程上了。缓冲垫不再随机,性能随之下降,相当于退化为原本的无锁栈了。
1 public class LockFreeExchanger<T> {
2 static final int EMPTY = ..., WAITING = ..., BUSY = ...;
3 AtomicStampedReference<T> slot = new AtomicStampedReference<T>(null, 0);
4 public T exchange(T myItem, long timeout, TimeUnit unit)
5 throws TimeoutException {
6 long nanos = unit.toNanos(timeout);
7 long timeBound = System.nanoTime() + nanos;
8 int[] stampHolder = {EMPTY};
9 while (true) {
10 if (System.nanoTime() > timeBound)
11 throw new TimeoutException();
12 T yrItem = slot.get(stampHolder);
13 int stamp = stampHolder[0];
14 switch(stamp) {
15 case EMPTY:
16 if (slot.compareAndSet(yrItem, myItem, EMPTY, WAITING)) {
17 while (System.nanoTime() < timeBound) {
18 yrItem = slot.get(stampHolder);
19 if (stampHolder[0] == BUSY) {
20 slot.set(null, EMPTY);
21 return yrItem;
22 }
23 }
24 if (slot.compareAndSet(myItem, null, WAITING, EMPTY)) {
25 throw new TimeoutException();
26 } else {
27 yrItem = slot.get(stampHolder);
28 slot.set(null, EMPTY);
29 return yrItem;
30 }
31 }
32 break;
33 case WAITING:
34 if (slot.compareAndSet(yrItem, myItem, WAITING, BUSY))
35 return yrItem;
36 break;
37 case BUSY:
38 break;
39 default: // impossible
40 ...
41 }
42 }
43 }
44 }
11.4.2 缓冲垫下的并发栈
并发的缓冲垫,避免原子操作的失败。这里面rangePolicy.getRange()操作如果是小范围的,那么在线程数量不多的情况下就容易增加命中,如果范围很大,会降低线程的忙等。
1 public class EliminationBackoffStack<T> extends LockFreeStack<T> {
2 static final int capacity = ...;
3 EliminationArray<T> eliminationArray = new EliminationArray<T>(capacity);
4 static ThreadLocal<RangePolicy> policy = new ThreadLocal<RangePolicy>() {
5 protected synchronized RangePolicy initialValue() {
6 return new RangePolicy();
7 }
8
9 public void push(T value) {
10 RangePolicy rangePolicy = policy.get();
11 Node node = new Node(value);
12 while (true) {
13 if (tryPush(node)) {
14 return;
15 } else try {
16 T otherValue = eliminationArray.visit(value, rangePolicy.getRange());
17 if (otherValue == null) {
18 rangePolicy.recordEliminationSuccess();
19 return; // exchanged with pop
20 }
21 } catch (TimeoutException ex) {
22 rangePolicy.recordEliminationTimeout();
23 }
24 }
25 }
26 }
27 public T pop() throws EmptyException {
28 RangePolicy rangePolicy = policy.get();
29 while (true) {
30 Node returnNode = tryPop();
31 if (returnNode != null) {
32 return returnNode.value;
33 } else try {
34 T otherValue = eliminationArray.visit(null, rangePolicy.getRange());
35 if (otherValue != null) {
36 rangePolicy.recordEliminationSuccess();
37 return otherValue;
38 }
39 } catch (TimeoutException ex) {
40 rangePolicy.recordEliminationTimeout();
41 }
42 }
43 }
第十二章:计数,排序和分布式协作
两种方式衡量并发,很神奇的是,对于分布式系统而言,这两个并不是互相矛盾的.:
- latency, the time it takes an individual method call to complete
- throughput, the overall rate at which method calls complete
12.3 软件合并
合并树,本质是每次都让主动的进程去做操作。但是这个实现并不是那么简单,每个节点都要自旋一段时间,来保证两个线程可以碰上对方,因此给个节点要有自己的状态码。不过想一想合并树这个操作真的还挺骚,但是在最极端的情况下,这个算法并不怎么好使
1 public class Node {
2 enum CStatus{IDLE, FIRST, SECOND, RESULT, ROOT};
3 boolean locked;
4 CStatus cStatus;
5 int firstValue, secondValue;
6 int result;
7 Node parent;
8 public Node() {
9 cStatus = CStatus.ROOT;
10 locked = false;
11 }
12 public Node(Node myParent) {
13 parent = myParent;
14 cStatus = CStatus.IDLE;
15 locked = false;
16 }
17 ...
18 }
合并树代码
合并树的preCombine方法需要保证原子吧,感觉有点问题啊。
1 public CombiningTree(int width) {
2 Node[] nodes = new Node[2 * width - 1];
3 nodes[0] = new Node();
4 for (int i = 1; i < nodes.length; i++) {
5 nodes[i] = new Node(nodes[(i-1)/2]);
6 }
7 leaf = new Node[width];
8 for (int i = 0; i < leaf.length; i++) {
9 leaf[i] = nodes[nodes.length - i - 1];
10 }
11 }
12 public int getAndIncrement() {
13 Stack<Node> stack = new Stack<Node>();
14 Node myLeaf = leaf[ThreadID.get()/2];
15 Node node = myLeaf;
16 // precombining phase
17 while (node.precombine()) {
18 node = node.parent;
19 }
20 Node stop = node;
21 // combining phase
22 int combined = 1;
23 for (node = myLeaf; node != stop; node = node.parent) {
24 combined = node.combine(combined);
25 stack.push(node);
26 }
27 // operation phase
28 int prior = stop.op(combined);
29 // distribution phase
30 while (!stack.empty()) {
31 node = stack.pop();
32 node.distribute(prior);
33 }
34 return prior;
35 }
对流程的理解还是得看图,a和b图时A处于preCombine阶段,c图时,A负责combine负责合并底下的每个节点,它先等待B线程释放锁。d图B释放了锁,然后A获得锁开始合并。A在operation阶段,如果是操作root节点,那就直接加了。如果不是,那就将值存到左值,然后通知每个被block的节点。结果返回了,A进入分发阶段,
12.5 计数网络
唉,尴尬