1   /*
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.client;
21  
22  import java.io.IOException;
23  import java.lang.reflect.Field;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.concurrent.Executor;
27  import java.util.concurrent.ThreadPoolExecutor;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.HBaseTestingUtility;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.apache.hadoop.hbase.util.JVMClusterUtil;
35  import org.junit.AfterClass;
36  import org.junit.Assert;
37  import org.junit.Before;
38  import org.junit.BeforeClass;
39  import org.junit.Test;
40  
41  import static org.junit.Assert.*;
42  
43  public class TestMultiParallel {
44    private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
45    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
46    private static final byte[] VALUE = Bytes.toBytes("value");
47    private static final byte[] QUALIFIER = Bytes.toBytes("qual");
48    private static final String FAMILY = "family";
49    private static final String TEST_TABLE = "multi_test_table";
50    private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
51    private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
52    private static final byte [][] KEYS = makeKeys();
53  
54    private static final int slaves = 2; // also used for testing HTable pool size
55    @BeforeClass public static void beforeClass() throws Exception {
56      UTIL.startMiniCluster(slaves);
57      HTable t = UTIL.createTable(Bytes.toBytes(TEST_TABLE), Bytes.toBytes(FAMILY));
58      UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY));
59    }
60  
61    @AfterClass public static void afterClass() throws IOException {
62      UTIL.getMiniHBaseCluster().shutdown();
63    }
64  
65    @Before public void before() throws IOException {
66      LOG.info("before");
67      if (UTIL.ensureSomeRegionServersAvailable(slaves)) {
68        // Distribute regions
69        UTIL.getMiniHBaseCluster().getMaster().balance();
70      }
71      LOG.info("before done");
72    }
73  
74    private static byte[][] makeKeys() {
75      byte [][] starterKeys = HBaseTestingUtility.KEYS;
76      // Create a "non-uniform" test set with the following characteristics:
77      // a) Unequal number of keys per region
78  
79      // Don't use integer as a multiple, so that we have a number of keys that is
80      // not a multiple of the number of regions
81      int numKeys = (int) ((float) starterKeys.length * 10.33F);
82  
83      List<byte[]> keys = new ArrayList<byte[]>();
84      for (int i = 0; i < numKeys; i++) {
85        int kIdx = i % starterKeys.length;
86        byte[] k = starterKeys[kIdx];
87        byte[] cp = new byte[k.length + 1];
88        System.arraycopy(k, 0, cp, 0, k.length);
89        cp[k.length] = new Integer(i % 256).byteValue();
90        keys.add(cp);
91      }
92  
93      // b) Same duplicate keys (showing multiple Gets/Puts to the same row, which
94      // should work)
95      // c) keys are not in sorted order (within a region), to ensure that the
96      // sorting code and index mapping doesn't break the functionality
97      for (int i = 0; i < 100; i++) {
98        int kIdx = i % starterKeys.length;
99        byte[] k = starterKeys[kIdx];
100       byte[] cp = new byte[k.length + 1];
101       System.arraycopy(k, 0, cp, 0, k.length);
102       cp[k.length] = new Integer(i % 256).byteValue();
103       keys.add(cp);
104     }
105     return keys.toArray(new byte [][] {new byte [] {}});
106   }
107 
108   @Test public void testBatchWithGet() throws Exception {
109     LOG.info("test=testBatchWithGet");
110     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
111 
112     // load test data
113     List<Row> puts = constructPutRequests();
114     table.batch(puts);
115 
116     // create a list of gets and run it
117     List<Row> gets = new ArrayList<Row>();
118     for (byte[] k : KEYS) {
119       Get get = new Get(k);
120       get.addColumn(BYTES_FAMILY, QUALIFIER);
121       gets.add(get);
122     }
123     Result[] multiRes = new Result[gets.size()];
124     table.batch(gets, multiRes);
125 
126     // Same gets using individual call API
127     List<Result> singleRes = new ArrayList<Result>();
128     for (Row get : gets) {
129       singleRes.add(table.get((Get) get));
130     }
131 
132     // Compare results
133     Assert.assertEquals(singleRes.size(), multiRes.length);
134     for (int i = 0; i < singleRes.size(); i++) {
135       Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
136       KeyValue[] singleKvs = singleRes.get(i).raw();
137       KeyValue[] multiKvs = multiRes[i].raw();
138       for (int j = 0; j < singleKvs.length; j++) {
139         Assert.assertEquals(singleKvs[j], multiKvs[j]);
140         Assert.assertEquals(0, Bytes.compareTo(singleKvs[j].getValue(), multiKvs[j]
141             .getValue()));
142       }
143     }
144   }
145 
146   @Test
147   public void testBadFam() throws Exception {
148     LOG.info("test=testBadFam");
149     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
150 
151     List<Row> actions = new ArrayList<Row>();
152     Put p = new Put(Bytes.toBytes("row1"));
153     p.add(Bytes.toBytes("bad_family"), Bytes.toBytes("qual"), Bytes.toBytes("value"));
154     actions.add(p);
155     p = new Put(Bytes.toBytes("row2"));
156     p.add(BYTES_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
157     actions.add(p);
158 
159     // row1 and row2 should be in the same region.
160 
161     Object [] r = new Object[actions.size()];
162     try {
163       table.batch(actions, r);
164       fail();
165     } catch (RetriesExhaustedWithDetailsException ex) {
166       LOG.debug(ex);
167       // good!
168       assertFalse(ex.mayHaveClusterIssues());
169     }
170     assertEquals(2, r.length);
171     assertTrue(r[0] instanceof Throwable);
172     assertTrue(r[1] instanceof Result);
173   }
174 
175   /**
176    * Only run one Multi test with a forced RegionServer abort. Otherwise, the
177    * unit tests will take an unnecessarily long time to run.
178    *
179    * @throws Exception
180    */
181   @Test public void testFlushCommitsWithAbort() throws Exception {
182     LOG.info("test=testFlushCommitsWithAbort");
183     doTestFlushCommits(true);
184   }
185 
186   @Test public void testFlushCommitsNoAbort() throws Exception {
187     LOG.info("test=testFlushCommitsNoAbort");
188     doTestFlushCommits(false);
189   }
190 
191   private void doTestFlushCommits(boolean doAbort) throws Exception {
192     // Load the data
193     LOG.info("get new table");
194     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
195     table.setAutoFlush(false);
196     table.setWriteBufferSize(10 * 1024 * 1024);
197 
198     LOG.info("constructPutRequests");
199     List<Row> puts = constructPutRequests();
200     for (Row put : puts) {
201       table.put((Put) put);
202     }
203     LOG.info("puts");
204     table.flushCommits();
205     if (doAbort) {
206       LOG.info("Aborted=" + UTIL.getMiniHBaseCluster().abortRegionServer(0));
207 
208       // try putting more keys after the abort. same key/qual... just validating
209       // no exceptions thrown
210       puts = constructPutRequests();
211       for (Row put : puts) {
212         table.put((Put) put);
213       }
214 
215       table.flushCommits();
216     }
217 
218     LOG.info("validating loaded data");
219     validateLoadedData(table);
220 
221     // Validate server and region count
222     List<JVMClusterUtil.RegionServerThread> liveRSs =
223       UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
224     int count = 0;
225     for (JVMClusterUtil.RegionServerThread t: liveRSs) {
226       count++;
227       LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
228     }
229     LOG.info("Count=" + count);
230     Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
231       (doAbort ? 1 : 2), count);
232     for (JVMClusterUtil.RegionServerThread t: liveRSs) {
233       int regions = t.getRegionServer().getOnlineRegions().size();
234       Assert.assertTrue("Count of regions=" + regions, regions > 10);
235     }
236     LOG.info("done");
237   }
238 
239   @Test public void testBatchWithPut() throws Exception {
240     LOG.info("test=testBatchWithPut");
241     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
242 
243     // put multiple rows using a batch
244     List<Row> puts = constructPutRequests();
245 
246     Object[] results = table.batch(puts);
247     validateSizeAndEmpty(results, KEYS.length);
248 
249     if (true) {
250       UTIL.getMiniHBaseCluster().abortRegionServer(0);
251 
252       puts = constructPutRequests();
253       results = table.batch(puts);
254       validateSizeAndEmpty(results, KEYS.length);
255     }
256 
257     validateLoadedData(table);
258   }
259 
260   @Test public void testBatchWithDelete() throws Exception {
261     LOG.info("test=testBatchWithDelete");
262     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
263 
264     // Load some data
265     List<Row> puts = constructPutRequests();
266     Object[] results = table.batch(puts);
267     validateSizeAndEmpty(results, KEYS.length);
268 
269     // Deletes
270     List<Row> deletes = new ArrayList<Row>();
271     for (int i = 0; i < KEYS.length; i++) {
272       Delete delete = new Delete(KEYS[i]);
273       delete.deleteFamily(BYTES_FAMILY);
274       deletes.add(delete);
275     }
276     results = table.batch(deletes);
277     validateSizeAndEmpty(results, KEYS.length);
278 
279     // Get to make sure ...
280     for (byte[] k : KEYS) {
281       Get get = new Get(k);
282       get.addColumn(BYTES_FAMILY, QUALIFIER);
283       Assert.assertFalse(table.exists(get));
284     }
285 
286   }
287 
288   @Test public void testHTableDeleteWithList() throws Exception {
289     LOG.info("test=testHTableDeleteWithList");
290     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
291 
292     // Load some data
293     List<Row> puts = constructPutRequests();
294     Object[] results = table.batch(puts);
295     validateSizeAndEmpty(results, KEYS.length);
296 
297     // Deletes
298     ArrayList<Delete> deletes = new ArrayList<Delete>();
299     for (int i = 0; i < KEYS.length; i++) {
300       Delete delete = new Delete(KEYS[i]);
301       delete.deleteFamily(BYTES_FAMILY);
302       deletes.add(delete);
303     }
304     table.delete(deletes);
305     Assert.assertTrue(deletes.isEmpty());
306 
307     // Get to make sure ...
308     for (byte[] k : KEYS) {
309       Get get = new Get(k);
310       get.addColumn(BYTES_FAMILY, QUALIFIER);
311       Assert.assertFalse(table.exists(get));
312     }
313 
314   }
315 
316   @Test public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
317     LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
318     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
319 
320     List<Row> puts = new ArrayList<Row>();
321     for (int i = 0; i < 100; i++) {
322       Put put = new Put(ONE_ROW);
323       byte[] qual = Bytes.toBytes("column" + i);
324       put.add(BYTES_FAMILY, qual, VALUE);
325       puts.add(put);
326     }
327     Object[] results = table.batch(puts);
328 
329     // validate
330     validateSizeAndEmpty(results, 100);
331 
332     // get the data back and validate that it is correct
333     List<Row> gets = new ArrayList<Row>();
334     for (int i = 0; i < 100; i++) {
335       Get get = new Get(ONE_ROW);
336       byte[] qual = Bytes.toBytes("column" + i);
337       get.addColumn(BYTES_FAMILY, qual);
338       gets.add(get);
339     }
340 
341     Object[] multiRes = table.batch(gets);
342 
343     int idx = 0;
344     for (Object r : multiRes) {
345       byte[] qual = Bytes.toBytes("column" + idx);
346       validateResult(r, qual, VALUE);
347       idx++;
348     }
349 
350   }
351 
352   @Test public void testBatchWithMixedActions() throws Exception {
353     LOG.info("test=testBatchWithMixedActions");
354     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
355 
356     // Load some data to start
357     Object[] results = table.batch(constructPutRequests());
358     validateSizeAndEmpty(results, KEYS.length);
359 
360     // Batch: get, get, put(new col), delete, get, get of put, get of deleted,
361     // put
362     List<Row> actions = new ArrayList<Row>();
363 
364     byte[] qual2 = Bytes.toBytes("qual2");
365     byte[] val2 = Bytes.toBytes("putvalue2");
366 
367     // 0 get
368     Get get = new Get(KEYS[10]);
369     get.addColumn(BYTES_FAMILY, QUALIFIER);
370     actions.add(get);
371 
372     // 1 get
373     get = new Get(KEYS[11]);
374     get.addColumn(BYTES_FAMILY, QUALIFIER);
375     actions.add(get);
376 
377     // 2 put of new column
378     Put put = new Put(KEYS[10]);
379     put.add(BYTES_FAMILY, qual2, val2);
380     actions.add(put);
381 
382     // 3 delete
383     Delete delete = new Delete(KEYS[20]);
384     delete.deleteFamily(BYTES_FAMILY);
385     actions.add(delete);
386 
387     // 4 get
388     get = new Get(KEYS[30]);
389     get.addColumn(BYTES_FAMILY, QUALIFIER);
390     actions.add(get);
391 
392     // There used to be a 'get' of a previous put here, but removed
393     // since this API really cannot guarantee order in terms of mixed
394     // get/puts.
395 
396     // 5 put of new column
397     put = new Put(KEYS[40]);
398     put.add(BYTES_FAMILY, qual2, val2);
399     actions.add(put);
400 
401     results = table.batch(actions);
402 
403     // Validation
404 
405     validateResult(results[0]);
406     validateResult(results[1]);
407     validateEmpty(results[2]);
408     validateEmpty(results[3]);
409     validateResult(results[4]);
410     validateEmpty(results[5]);
411 
412     // validate last put, externally from the batch
413     get = new Get(KEYS[40]);
414     get.addColumn(BYTES_FAMILY, qual2);
415     Result r = table.get(get);
416     validateResult(r, qual2, val2);
417   }
418 
419   // // Helper methods ////
420 
421   private void validateResult(Object r) {
422     validateResult(r, QUALIFIER, VALUE);
423   }
424 
425   private void validateResult(Object r1, byte[] qual, byte[] val) {
426     // TODO provide nice assert here or something.
427     Result r = (Result)r1;
428     Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
429     Assert.assertEquals(0, Bytes.compareTo(val, r.getValue(BYTES_FAMILY, qual)));
430   }
431 
432   private List<Row> constructPutRequests() {
433     List<Row> puts = new ArrayList<Row>();
434     for (byte[] k : KEYS) {
435       Put put = new Put(k);
436       put.add(BYTES_FAMILY, QUALIFIER, VALUE);
437       puts.add(put);
438     }
439     return puts;
440   }
441 
442   private void validateLoadedData(HTable table) throws IOException {
443     // get the data back and validate that it is correct
444     for (byte[] k : KEYS) {
445       LOG.info("Assert=" + Bytes.toString(k));
446       Get get = new Get(k);
447       get.addColumn(BYTES_FAMILY, QUALIFIER);
448       Result r = table.get(get);
449       Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
450       Assert.assertEquals(0, Bytes.compareTo(VALUE, r
451           .getValue(BYTES_FAMILY, QUALIFIER)));
452     }
453   }
454 
455   private void validateEmpty(Object r1) {
456     Result result = (Result)r1;
457     Assert.assertTrue(result != null);
458     Assert.assertTrue(result.getRow() == null);
459     Assert.assertEquals(0, result.raw().length);
460   }
461 
462   private void validateSizeAndEmpty(Object[] results, int expectedSize) {
463     // Validate got back the same number of Result objects, all empty
464     Assert.assertEquals(expectedSize, results.length);
465     for (Object result : results) {
466       validateEmpty(result);
467     }
468   }
469 
470   /**
471    * This is for testing the active number of threads that were used while
472    * doing a batch operation. It inserts one row per region via the batch
473    * operation, and then checks the number of active threads.
474    * For HBASE-3553
475    * @throws IOException
476    * @throws InterruptedException
477    * @throws NoSuchFieldException
478    * @throws SecurityException
479    */
480   @Test public void testActiveThreadsCount() throws Exception{
481     HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
482     List<Row> puts = constructPutRequests(); // creates a Put for every region
483     table.batch(puts);
484     Field poolField = table.getClass().getDeclaredField("pool");
485     poolField.setAccessible(true);
486     ThreadPoolExecutor tExecutor = (ThreadPoolExecutor) poolField.get(table);
487     assertEquals(slaves, tExecutor.getLargestPoolSize());
488   }
489 }