1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.IOException;
23 import java.lang.reflect.Field;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.ThreadPoolExecutor;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.HBaseTestingUtility;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.apache.hadoop.hbase.util.JVMClusterUtil;
35 import org.junit.AfterClass;
36 import org.junit.Assert;
37 import org.junit.Before;
38 import org.junit.BeforeClass;
39 import org.junit.Test;
40
41 import static org.junit.Assert.*;
42
43 public class TestMultiParallel {
44 private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
45 private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
46 private static final byte[] VALUE = Bytes.toBytes("value");
47 private static final byte[] QUALIFIER = Bytes.toBytes("qual");
48 private static final String FAMILY = "family";
49 private static final String TEST_TABLE = "multi_test_table";
50 private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
51 private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
52 private static final byte [][] KEYS = makeKeys();
53
54 private static final int slaves = 2;
55 @BeforeClass public static void beforeClass() throws Exception {
56 UTIL.startMiniCluster(slaves);
57 HTable t = UTIL.createTable(Bytes.toBytes(TEST_TABLE), Bytes.toBytes(FAMILY));
58 UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY));
59 }
60
61 @AfterClass public static void afterClass() throws IOException {
62 UTIL.getMiniHBaseCluster().shutdown();
63 }
64
65 @Before public void before() throws IOException {
66 LOG.info("before");
67 if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
68
69 UTIL.getMiniHBaseCluster().getMaster().balance();
70 }
71 LOG.info("before done");
72 }
73
74 private static byte[][] makeKeys() {
75 byte [][] starterKeys = HBaseTestingUtility.KEYS;
76
77
78
79
80
81 int numKeys = (int) ((float) starterKeys.length * 10.33F);
82
83 List<byte[]> keys = new ArrayList<byte[]>();
84 for (int i = 0; i < numKeys; i++) {
85 int kIdx = i % starterKeys.length;
86 byte[] k = starterKeys[kIdx];
87 byte[] cp = new byte[k.length + 1];
88 System.arraycopy(k, 0, cp, 0, k.length);
89 cp[k.length] = new Integer(i % 256).byteValue();
90 keys.add(cp);
91 }
92
93
94
95
96
97 for (int i = 0; i < 100; i++) {
98 int kIdx = i % starterKeys.length;
99 byte[] k = starterKeys[kIdx];
100 byte[] cp = new byte[k.length + 1];
101 System.arraycopy(k, 0, cp, 0, k.length);
102 cp[k.length] = new Integer(i % 256).byteValue();
103 keys.add(cp);
104 }
105 return keys.toArray(new byte [][] {new byte [] {}});
106 }
107
108 @Test public void testBatchWithGet() throws Exception {
109 LOG.info("test=testBatchWithGet");
110 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
111
112
113 List<Row> puts = constructPutRequests();
114 table.batch(puts);
115
116
117 List<Row> gets = new ArrayList<Row>();
118 for (byte[] k : KEYS) {
119 Get get = new Get(k);
120 get.addColumn(BYTES_FAMILY, QUALIFIER);
121 gets.add(get);
122 }
123 Result[] multiRes = new Result[gets.size()];
124 table.batch(gets, multiRes);
125
126
127 List<Result> singleRes = new ArrayList<Result>();
128 for (Row get : gets) {
129 singleRes.add(table.get((Get) get));
130 }
131
132
133 Assert.assertEquals(singleRes.size(), multiRes.length);
134 for (int i = 0; i < singleRes.size(); i++) {
135 Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
136 KeyValue[] singleKvs = singleRes.get(i).raw();
137 KeyValue[] multiKvs = multiRes[i].raw();
138 for (int j = 0; j < singleKvs.length; j++) {
139 Assert.assertEquals(singleKvs[j], multiKvs[j]);
140 Assert.assertEquals(0, Bytes.compareTo(singleKvs[j].getValue(), multiKvs[j]
141 .getValue()));
142 }
143 }
144 }
145
146 @Test
147 public void testBadFam() throws Exception {
148 LOG.info("test=testBadFam");
149 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
150
151 List<Row> actions = new ArrayList<Row>();
152 Put p = new Put(Bytes.toBytes("row1"));
153 p.add(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
154 actions.add(p);
155 p = new Put(Bytes.toBytes("row2"));
156 p.add(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
157 actions.add(p);
158
159
160
161 Object [] r = new Object[actions.size()];
162 try {
163 table.batch(actions, r);
164 fail();
165 } catch (RetriesExhaustedWithDetailsException ex) {
166 LOG.debug(ex);
167
168 assertFalse(ex.mayHaveClusterIssues());
169 }
170 assertEquals(2, r.length);
171 assertTrue(r[0] instanceof Throwable);
172 assertTrue(r[1] instanceof Result);
173 }
174
175
176
177
178
179
180
181 @Test public void testFlushCommitsWithAbort() throws Exception {
182 LOG.info("test=testFlushCommitsWithAbort");
183 doTestFlushCommits(true);
184 }
185
186 @Test public void testFlushCommitsNoAbort() throws Exception {
187 LOG.info("test=testFlushCommitsNoAbort");
188 doTestFlushCommits(false);
189 }
190
191 private void doTestFlushCommits(boolean doAbort) throws Exception {
192
193 LOG.info("get new table");
194 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
195 table.setAutoFlush(false);
196 table.setWriteBufferSize(10 * 1024 * 1024);
197
198 LOG.info("constructPutRequests");
199 List<Row> puts = constructPutRequests();
200 for (Row put : puts) {
201 table.put((Put) put);
202 }
203 LOG.info("puts");
204 table.flushCommits();
205 if (doAbort) {
206 LOG.info("Aborted=" + UTIL.getMiniHBaseCluster().abortRegionServer(0));
207
208
209
210 puts = constructPutRequests();
211 for (Row put : puts) {
212 table.put((Put) put);
213 }
214
215 table.flushCommits();
216 }
217
218 LOG.info("validating loaded data");
219 validateLoadedData(table);
220
221
222 List<JVMClusterUtil.RegionServerThread> liveRSs =
223 UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
224 int count = 0;
225 for (JVMClusterUtil.RegionServerThread t: liveRSs) {
226 count++;
227 LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
228 }
229 LOG.info("Count=" + count);
230 Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
231 (doAbort ? 1 : 2), count);
232 for (JVMClusterUtil.RegionServerThread t: liveRSs) {
233 int regions = t.getRegionServer().getOnlineRegions().size();
234 Assert.assertTrue("Count of regions=" + regions, regions > 10);
235 }
236 LOG.info("done");
237 }
238
239 @Test public void testBatchWithPut() throws Exception {
240 LOG.info("test=testBatchWithPut");
241 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
242
243
244 List<Row> puts = constructPutRequests();
245
246 Object[] results = table.batch(puts);
247 validateSizeAndEmpty(results, KEYS.length);
248
249 if (true) {
250 UTIL.getMiniHBaseCluster().abortRegionServer(0);
251
252 puts = constructPutRequests();
253 results = table.batch(puts);
254 validateSizeAndEmpty(results, KEYS.length);
255 }
256
257 validateLoadedData(table);
258 }
259
260 @Test public void testBatchWithDelete() throws Exception {
261 LOG.info("test=testBatchWithDelete");
262 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
263
264
265 List<Row> puts = constructPutRequests();
266 Object[] results = table.batch(puts);
267 validateSizeAndEmpty(results, KEYS.length);
268
269
270 List<Row> deletes = new ArrayList<Row>();
271 for (int i = 0; i < KEYS.length; i++) {
272 Delete delete = new Delete(KEYS[i]);
273 delete.deleteFamily(BYTES_FAMILY);
274 deletes.add(delete);
275 }
276 results = table.batch(deletes);
277 validateSizeAndEmpty(results, KEYS.length);
278
279
280 for (byte[] k : KEYS) {
281 Get get = new Get(k);
282 get.addColumn(BYTES_FAMILY, QUALIFIER);
283 Assert.assertFalse(table.exists(get));
284 }
285
286 }
287
288 @Test public void testHTableDeleteWithList() throws Exception {
289 LOG.info("test=testHTableDeleteWithList");
290 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
291
292
293 List<Row> puts = constructPutRequests();
294 Object[] results = table.batch(puts);
295 validateSizeAndEmpty(results, KEYS.length);
296
297
298 ArrayList<Delete> deletes = new ArrayList<Delete>();
299 for (int i = 0; i < KEYS.length; i++) {
300 Delete delete = new Delete(KEYS[i]);
301 delete.deleteFamily(BYTES_FAMILY);
302 deletes.add(delete);
303 }
304 table.delete(deletes);
305 Assert.assertTrue(deletes.isEmpty());
306
307
308 for (byte[] k : KEYS) {
309 Get get = new Get(k);
310 get.addColumn(BYTES_FAMILY, QUALIFIER);
311 Assert.assertFalse(table.exists(get));
312 }
313
314 }
315
316 @Test public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
317 LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
318 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
319
320 List<Row> puts = new ArrayList<Row>();
321 for (int i = 0; i < 100; i++) {
322 Put put = new Put(ONE_ROW);
323 byte[] qual = Bytes.toBytes("column" + i);
324 put.add(BYTES_FAMILY, qual, VALUE);
325 puts.add(put);
326 }
327 Object[] results = table.batch(puts);
328
329
330 validateSizeAndEmpty(results, 100);
331
332
333 List<Row> gets = new ArrayList<Row>();
334 for (int i = 0; i < 100; i++) {
335 Get get = new Get(ONE_ROW);
336 byte[] qual = Bytes.toBytes("column" + i);
337 get.addColumn(BYTES_FAMILY, qual);
338 gets.add(get);
339 }
340
341 Object[] multiRes = table.batch(gets);
342
343 int idx = 0;
344 for (Object r : multiRes) {
345 byte[] qual = Bytes.toBytes("column" + idx);
346 validateResult(r, qual, VALUE);
347 idx++;
348 }
349
350 }
351
352 @Test public void testBatchWithMixedActions() throws Exception {
353 LOG.info("test=testBatchWithMixedActions");
354 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
355
356
357 Object[] results = table.batch(constructPutRequests());
358 validateSizeAndEmpty(results, KEYS.length);
359
360
361
362 List<Row> actions = new ArrayList<Row>();
363
364 byte[] qual2 = Bytes.toBytes("qual2");
365 byte[] val2 = Bytes.toBytes("putvalue2");
366
367
368 Get get = new Get(KEYS[10]);
369 get.addColumn(BYTES_FAMILY, QUALIFIER);
370 actions.add(get);
371
372
373 get = new Get(KEYS[11]);
374 get.addColumn(BYTES_FAMILY, QUALIFIER);
375 actions.add(get);
376
377
378 Put put = new Put(KEYS[10]);
379 put.add(BYTES_FAMILY, qual2, val2);
380 actions.add(put);
381
382
383 Delete delete = new Delete(KEYS[20]);
384 delete.deleteFamily(BYTES_FAMILY);
385 actions.add(delete);
386
387
388 get = new Get(KEYS[30]);
389 get.addColumn(BYTES_FAMILY, QUALIFIER);
390 actions.add(get);
391
392
393
394
395
396
397 put = new Put(KEYS[40]);
398 put.add(BYTES_FAMILY, qual2, val2);
399 actions.add(put);
400
401 results = table.batch(actions);
402
403
404
405 validateResult(results[0]);
406 validateResult(results[1]);
407 validateEmpty(results[2]);
408 validateEmpty(results[3]);
409 validateResult(results[4]);
410 validateEmpty(results[5]);
411
412
413 get = new Get(KEYS[40]);
414 get.addColumn(BYTES_FAMILY, qual2);
415 Result r = table.get(get);
416 validateResult(r, qual2, val2);
417 }
418
419
420
421 private void validateResult(Object r) {
422 validateResult(r, QUALIFIER, VALUE);
423 }
424
425 private void validateResult(Object r1, byte[] qual, byte[] val) {
426
427 Result r = (Result)r1;
428 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
429 Assert.assertEquals(0, Bytes.compareTo(val, r.getValue(BYTES_FAMILY, qual)));
430 }
431
432 private List<Row> constructPutRequests() {
433 List<Row> puts = new ArrayList<Row>();
434 for (byte[] k : KEYS) {
435 Put put = new Put(k);
436 put.add(BYTES_FAMILY, QUALIFIER, VALUE);
437 puts.add(put);
438 }
439 return puts;
440 }
441
442 private void validateLoadedData(HTable table) throws IOException {
443
444 for (byte[] k : KEYS) {
445 LOG.info("Assert=" + Bytes.toString(k));
446 Get get = new Get(k);
447 get.addColumn(BYTES_FAMILY, QUALIFIER);
448 Result r = table.get(get);
449 Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
450 Assert.assertEquals(0, Bytes.compareTo(VALUE, r
451 .getValue(BYTES_FAMILY, QUALIFIER)));
452 }
453 }
454
455 private void validateEmpty(Object r1) {
456 Result result = (Result)r1;
457 Assert.assertTrue(result != null);
458 Assert.assertTrue(result.getRow() == null);
459 Assert.assertEquals(0, result.raw().length);
460 }
461
462 private void validateSizeAndEmpty(Object[] results, int expectedSize) {
463
464 Assert.assertEquals(expectedSize, results.length);
465 for (Object result : results) {
466 validateEmpty(result);
467 }
468 }
469
470
471
472
473
474
475
476
477
478
479
480 @Test public void testActiveThreadsCount() throws Exception{
481 HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
482 List<Row> puts = constructPutRequests();
483 table.batch(puts);
484 Field poolField = table.getClass().getDeclaredField("pool");
485 poolField.setAccessible(true);
486 ThreadPoolExecutor tExecutor = (ThreadPoolExecutor) poolField.get(table);
487 assertEquals(slaves, tExecutor.getLargestPoolSize());
488 }
489 }