1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.util.LinkedList;
23
24 import org.apache.hadoop.hbase.util.Bytes;
25 import org.apache.hadoop.hbase.util.ClassSize;
26
27
28
29
30
31
32
33 public class ReadWriteConsistencyControl {
34 private volatile long memstoreRead = 0;
35 private volatile long memstoreWrite = 0;
36
37 private final Object readWaiters = new Object();
38
39
40 private final LinkedList<WriteEntry> writeQueue =
41 new LinkedList<WriteEntry>();
42
43 private static final ThreadLocal<Long> perThreadReadPoint =
44 new ThreadLocal<Long>();
45
46
47
48
49
50
51 public static long getThreadReadPoint() {
52 return perThreadReadPoint.get();
53 }
54
55
56
57
58
59
60 public static void setThreadReadPoint(long readPoint) {
61 perThreadReadPoint.set(readPoint);
62 }
63
64
65
66
67
68 public static long resetThreadReadPoint(ReadWriteConsistencyControl rwcc) {
69 perThreadReadPoint.set(rwcc.memstoreReadPoint());
70 return getThreadReadPoint();
71 }
72
73
74
75
76 public static void resetThreadReadPoint() {
77 perThreadReadPoint.set(0L);
78 }
79
80 public WriteEntry beginMemstoreInsert() {
81 synchronized (writeQueue) {
82 long nextWriteNumber = ++memstoreWrite;
83 WriteEntry e = new WriteEntry(nextWriteNumber);
84 writeQueue.add(e);
85 return e;
86 }
87 }
88
89 public void completeMemstoreInsert(WriteEntry e) {
90 synchronized (writeQueue) {
91 e.markCompleted();
92
93 long nextReadValue = -1;
94 boolean ranOnce=false;
95 while (!writeQueue.isEmpty()) {
96 ranOnce=true;
97 WriteEntry queueFirst = writeQueue.getFirst();
98
99 if (nextReadValue > 0) {
100 if (nextReadValue+1 != queueFirst.getWriteNumber()) {
101 throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
102 + nextReadValue + " next: " + queueFirst.getWriteNumber());
103 }
104 }
105
106 if (queueFirst.isCompleted()) {
107 nextReadValue = queueFirst.getWriteNumber();
108 writeQueue.removeFirst();
109 } else {
110 break;
111 }
112 }
113
114 if (!ranOnce) {
115 throw new RuntimeException("never was a first");
116 }
117
118 if (nextReadValue > 0) {
119 synchronized (readWaiters) {
120 memstoreRead = nextReadValue;
121 readWaiters.notifyAll();
122 }
123
124 }
125 }
126
127 boolean interrupted = false;
128 synchronized (readWaiters) {
129 while (memstoreRead < e.getWriteNumber()) {
130 try {
131 readWaiters.wait(0);
132 } catch (InterruptedException ie) {
133
134
135 interrupted = true;
136 }
137 }
138 }
139 if (interrupted) Thread.currentThread().interrupt();
140 }
141
142 public long memstoreReadPoint() {
143 return memstoreRead;
144 }
145
146
147 public static class WriteEntry {
148 private long writeNumber;
149 private boolean completed = false;
150 WriteEntry(long writeNumber) {
151 this.writeNumber = writeNumber;
152 }
153 void markCompleted() {
154 this.completed = true;
155 }
156 boolean isCompleted() {
157 return this.completed;
158 }
159 long getWriteNumber() {
160 return this.writeNumber;
161 }
162 }
163
164 public static final long FIXED_SIZE = ClassSize.align(
165 ClassSize.OBJECT +
166 2 * Bytes.SIZEOF_LONG +
167 2 * ClassSize.REFERENCE);
168
169 }