1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase;
21
22 import java.io.IOException;
23 import java.util.List;
24 import java.util.Random;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
31 import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
32 import org.apache.hadoop.hbase.client.Get;
33 import org.apache.hadoop.hbase.client.HTable;
34 import org.apache.hadoop.hbase.client.Put;
35 import org.apache.hadoop.hbase.client.Result;
36 import org.apache.hadoop.hbase.client.ResultScanner;
37 import org.apache.hadoop.hbase.client.Scan;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.junit.Ignore;
40 import org.junit.Test;
41
42 import com.google.common.collect.Lists;
43
44
45
46
47
48
49
50
51 public class TestAcidGuarantees {
52 protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
53 public static final byte [] TABLE_NAME = Bytes.toBytes("TestAcidGuarantees");
54 public static final byte [] FAMILY_A = Bytes.toBytes("A");
55 public static final byte [] FAMILY_B = Bytes.toBytes("B");
56 public static final byte [] FAMILY_C = Bytes.toBytes("C");
57 public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");
58
59 public static final byte[][] FAMILIES = new byte[][] {
60 FAMILY_A, FAMILY_B, FAMILY_C };
61
62 private HBaseTestingUtility util;
63
64 public static int NUM_COLS_TO_CHECK = 50;
65
66 private void createTableIfMissing()
67 throws IOException {
68 try {
69 util.createTable(TABLE_NAME, FAMILIES);
70 } catch (TableExistsException tee) {
71 }
72 }
73
74 public TestAcidGuarantees() {
75
76 Configuration conf = HBaseConfiguration.create();
77 conf.set("hbase.hregion.memstore.flush.size", String.valueOf(128*1024));
78 util = new HBaseTestingUtility(conf);
79 }
80
81
82
83
84 public static class AtomicityWriter extends RepeatingTestThread {
85 Random rand = new Random();
86 byte data[] = new byte[10];
87 byte targetRows[][];
88 byte targetFamilies[][];
89 HTable table;
90 AtomicLong numWritten = new AtomicLong();
91
92 public AtomicityWriter(TestContext ctx, byte targetRows[][],
93 byte targetFamilies[][]) throws IOException {
94 super(ctx);
95 this.targetRows = targetRows;
96 this.targetFamilies = targetFamilies;
97 table = new HTable(ctx.getConf(), TABLE_NAME);
98 }
99 public void doAnAction() throws Exception {
100
101 byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
102 Put p = new Put(targetRow);
103 rand.nextBytes(data);
104
105 for (byte[] family : targetFamilies) {
106 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
107 byte qualifier[] = Bytes.toBytes("col" + i);
108 p.add(family, qualifier, data);
109 }
110 }
111 table.put(p);
112 numWritten.getAndIncrement();
113 }
114 }
115
116
117
118
119
120 public static class AtomicGetReader extends RepeatingTestThread {
121 byte targetRow[];
122 byte targetFamilies[][];
123 HTable table;
124 int numVerified = 0;
125 AtomicLong numRead = new AtomicLong();
126
127 public AtomicGetReader(TestContext ctx, byte targetRow[],
128 byte targetFamilies[][]) throws IOException {
129 super(ctx);
130 this.targetRow = targetRow;
131 this.targetFamilies = targetFamilies;
132 table = new HTable(ctx.getConf(), TABLE_NAME);
133 }
134
135 public void doAnAction() throws Exception {
136 Get g = new Get(targetRow);
137 Result res = table.get(g);
138 byte[] gotValue = null;
139 if (res.getRow() == null) {
140
141
142
143 return;
144 }
145
146 for (byte[] family : targetFamilies) {
147 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
148 byte qualifier[] = Bytes.toBytes("col" + i);
149 byte thisValue[] = res.getValue(family, qualifier);
150 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
151 gotFailure(gotValue, res);
152 }
153 numVerified++;
154 gotValue = thisValue;
155 }
156 }
157 numRead.getAndIncrement();
158 }
159
160 private void gotFailure(byte[] expected, Result res) {
161 StringBuilder msg = new StringBuilder();
162 msg.append("Failed after ").append(numVerified).append("!");
163 msg.append("Expected=").append(Bytes.toStringBinary(expected));
164 msg.append("Got:\n");
165 for (KeyValue kv : res.list()) {
166 msg.append(kv.toString());
167 msg.append(" val= ");
168 msg.append(Bytes.toStringBinary(kv.getValue()));
169 msg.append("\n");
170 }
171 throw new RuntimeException(msg.toString());
172 }
173 }
174
175
176
177
178
179 public static class AtomicScanReader extends RepeatingTestThread {
180 byte targetFamilies[][];
181 HTable table;
182 AtomicLong numScans = new AtomicLong();
183 AtomicLong numRowsScanned = new AtomicLong();
184
185 public AtomicScanReader(TestContext ctx,
186 byte targetFamilies[][]) throws IOException {
187 super(ctx);
188 this.targetFamilies = targetFamilies;
189 table = new HTable(ctx.getConf(), TABLE_NAME);
190 }
191
192 public void doAnAction() throws Exception {
193 Scan s = new Scan();
194 for (byte[] family : targetFamilies) {
195 s.addFamily(family);
196 }
197 ResultScanner scanner = table.getScanner(s);
198
199 for (Result res : scanner) {
200 byte[] gotValue = null;
201
202 for (byte[] family : targetFamilies) {
203 for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
204 byte qualifier[] = Bytes.toBytes("col" + i);
205 byte thisValue[] = res.getValue(family, qualifier);
206 if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
207 gotFailure(gotValue, res);
208 }
209 gotValue = thisValue;
210 }
211 }
212 numRowsScanned.getAndIncrement();
213 }
214 numScans.getAndIncrement();
215 }
216
217 private void gotFailure(byte[] expected, Result res) {
218 StringBuilder msg = new StringBuilder();
219 msg.append("Failed after ").append(numRowsScanned).append("!");
220 msg.append("Expected=").append(Bytes.toStringBinary(expected));
221 msg.append("Got:\n");
222 for (KeyValue kv : res.list()) {
223 msg.append(kv.toString());
224 msg.append(" val= ");
225 msg.append(Bytes.toStringBinary(kv.getValue()));
226 msg.append("\n");
227 }
228 throw new RuntimeException(msg.toString());
229 }
230 }
231
232
233 public void runTestAtomicity(long millisToRun,
234 int numWriters,
235 int numGetters,
236 int numScanners,
237 int numUniqueRows) throws Exception {
238 createTableIfMissing();
239 TestContext ctx = new TestContext(util.getConfiguration());
240
241 byte rows[][] = new byte[numUniqueRows][];
242 for (int i = 0; i < numUniqueRows; i++) {
243 rows[i] = Bytes.toBytes("test_row_" + i);
244 }
245
246 List<AtomicityWriter> writers = Lists.newArrayList();
247 for (int i = 0; i < numWriters; i++) {
248 AtomicityWriter writer = new AtomicityWriter(
249 ctx, rows, FAMILIES);
250 writers.add(writer);
251 ctx.addThread(writer);
252 }
253
254 List<AtomicGetReader> getters = Lists.newArrayList();
255 for (int i = 0; i < numGetters; i++) {
256 AtomicGetReader getter = new AtomicGetReader(
257 ctx, rows[i % numUniqueRows], FAMILIES);
258 getters.add(getter);
259 ctx.addThread(getter);
260 }
261
262 List<AtomicScanReader> scanners = Lists.newArrayList();
263 for (int i = 0; i < numScanners; i++) {
264 AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES);
265 scanners.add(scanner);
266 ctx.addThread(scanner);
267 }
268
269 ctx.startThreads();
270 ctx.waitFor(millisToRun);
271 ctx.stop();
272
273 LOG.info("Finished test. Writers:");
274 for (AtomicityWriter writer : writers) {
275 LOG.info(" wrote " + writer.numWritten.get());
276 }
277 LOG.info("Readers:");
278 for (AtomicGetReader reader : getters) {
279 LOG.info(" read " + reader.numRead.get());
280 }
281 LOG.info("Scanners:");
282 for (AtomicScanReader scanner : scanners) {
283 LOG.info(" scanned " + scanner.numScans.get());
284 LOG.info(" verified " + scanner.numRowsScanned.get() + " rows");
285 }
286 }
287
288 @Test
289 @Ignore("Currently not passing - see HBASE-2856")
290 public void testGetAtomicity() throws Exception {
291 util.startMiniCluster(1);
292 try {
293 runTestAtomicity(20000, 5, 5, 0, 3);
294 } finally {
295 util.shutdownMiniCluster();
296 }
297 }
298
299 @Test
300 @Ignore("Currently not passing - see HBASE-2670")
301 public void testScanAtomicity() throws Exception {
302 util.startMiniCluster(1);
303 try {
304 runTestAtomicity(20000, 5, 0, 5, 3);
305 } finally {
306 util.shutdownMiniCluster();
307 }
308 }
309
310 @Test
311 @Ignore("Currently not passing - see HBASE-2670")
312 public void testMixedAtomicity() throws Exception {
313 util.startMiniCluster(1);
314 try {
315 runTestAtomicity(20000, 5, 2, 2, 3);
316 } finally {
317 util.shutdownMiniCluster();
318 }
319 }
320
321 public static void main(String args[]) throws Exception {
322 Configuration c = HBaseConfiguration.create();
323 TestAcidGuarantees test = new TestAcidGuarantees();
324 test.setConf(c);
325 test.runTestAtomicity(5*60*1000, 5, 2, 2, 3);
326 }
327
328 private void setConf(Configuration c) {
329 util = new HBaseTestingUtility(c);
330 }
331 }