1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase;
21
22 import static org.junit.Assert.assertTrue;
23
24 import java.io.File;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.lang.reflect.Field;
28 import java.security.MessageDigest;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.NavigableSet;
34 import java.util.UUID;
35
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.commons.logging.impl.Jdk14Logger;
39 import org.apache.commons.logging.impl.Log4JLogger;
40 import org.apache.hadoop.conf.Configuration;
41 import org.apache.hadoop.fs.FileSystem;
42 import org.apache.hadoop.fs.Path;
43 import org.apache.hadoop.hbase.client.Delete;
44 import org.apache.hadoop.hbase.client.Get;
45 import org.apache.hadoop.hbase.client.HBaseAdmin;
46 import org.apache.hadoop.hbase.client.HConnection;
47 import org.apache.hadoop.hbase.client.HTable;
48 import org.apache.hadoop.hbase.client.Put;
49 import org.apache.hadoop.hbase.client.Result;
50 import org.apache.hadoop.hbase.client.ResultScanner;
51 import org.apache.hadoop.hbase.client.Scan;
52 import org.apache.hadoop.hbase.master.HMaster;
53 import org.apache.hadoop.hbase.regionserver.HRegion;
54 import org.apache.hadoop.hbase.regionserver.HRegionServer;
55 import org.apache.hadoop.hbase.regionserver.InternalScanner;
56 import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl;
57 import org.apache.hadoop.hbase.regionserver.Store;
58 import org.apache.hadoop.hbase.security.User;
59 import org.apache.hadoop.hbase.util.Bytes;
60 import org.apache.hadoop.hbase.util.FSUtils;
61 import org.apache.hadoop.hbase.util.Threads;
62 import org.apache.hadoop.hbase.util.Writables;
63 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
64 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
65 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
66 import org.apache.hadoop.hdfs.DFSClient;
67 import org.apache.hadoop.hdfs.DistributedFileSystem;
68 import org.apache.hadoop.hdfs.MiniDFSCluster;
69 import org.apache.hadoop.hdfs.server.namenode.NameNode;
70 import org.apache.hadoop.mapred.MiniMRCluster;
71 import org.apache.zookeeper.ZooKeeper;
72
73
74
75
76
77
78
79
80
81
82 public class HBaseTestingUtility {
83 private final static Log LOG = LogFactory.getLog(HBaseTestingUtility.class);
84 private Configuration conf;
85 private MiniZooKeeperCluster zkCluster = null;
86
87
88
89
90 private boolean passedZkCluster = false;
91 private MiniDFSCluster dfsCluster = null;
92
93 private MiniHBaseCluster hbaseCluster = null;
94 private MiniMRCluster mrCluster = null;
95
96 private File clusterTestBuildDir = null;
97
98
99
100
101
102 public static final String TEST_DIRECTORY_KEY = "test.build.data";
103
104
105
106
107 public static final String DEFAULT_TEST_DIRECTORY = "target/test-data";
108
109 public HBaseTestingUtility() {
110 this(HBaseConfiguration.create());
111 }
112
113 public HBaseTestingUtility(Configuration conf) {
114 this.conf = conf;
115 }
116
117 public MiniHBaseCluster getHbaseCluster() {
118 return hbaseCluster;
119 }
120
121
122
123
124
125
126
127
128
129
130
131
132 public Configuration getConfiguration() {
133 return this.conf;
134 }
135
136
137
138
139
140
141
142
143 public static Path getTestDir() {
144 return new Path(System.getProperty(TEST_DIRECTORY_KEY,
145 DEFAULT_TEST_DIRECTORY));
146 }
147
148
149
150
151
152
153
154
155
156 public static Path getTestDir(final String subdirName) {
157 return new Path(getTestDir(), subdirName);
158 }
159
160
161
162
163
164
165
166
167
168
169
170
171
172 public File setupClusterTestBuildDir() {
173 String randomStr = UUID.randomUUID().toString();
174 String dirStr = getTestDir(randomStr).toString();
175 File dir = new File(dirStr).getAbsoluteFile();
176
177 dir.deleteOnExit();
178 return dir;
179 }
180
181
182
183
184 void isRunningCluster(String passedBuildPath) throws IOException {
185 if (this.clusterTestBuildDir == null || passedBuildPath != null) return;
186 throw new IOException("Cluster already running at " +
187 this.clusterTestBuildDir);
188 }
189
190
191
192
193
194
195
196
197 public MiniDFSCluster startMiniDFSCluster(int servers) throws Exception {
198 return startMiniDFSCluster(servers, null);
199 }
200
201
202
203
204
205
206
207
208
209
210 public MiniDFSCluster startMiniDFSCluster(int servers, final File dir)
211 throws Exception {
212
213
214
215
216 if (dir == null) {
217 this.clusterTestBuildDir = setupClusterTestBuildDir();
218 } else {
219 this.clusterTestBuildDir = dir;
220 }
221 System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.toString());
222 System.setProperty("test.cache.data", this.clusterTestBuildDir.toString());
223 this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true,
224 true, null, null, null, null);
225
226 FileSystem fs = this.dfsCluster.getFileSystem();
227 this.conf.set("fs.defaultFS", fs.getUri().toString());
228
229 this.conf.set("fs.default.name", fs.getUri().toString());
230 return this.dfsCluster;
231 }
232
233
234
235
236
237
238 public void shutdownMiniDFSCluster() throws Exception {
239 if (this.dfsCluster != null) {
240
241 this.dfsCluster.shutdown();
242 }
243 }
244
245
246
247
248
249
250
251
252 public MiniZooKeeperCluster startMiniZKCluster() throws Exception {
253 return startMiniZKCluster(setupClusterTestBuildDir());
254
255 }
256
257 private MiniZooKeeperCluster startMiniZKCluster(final File dir)
258 throws Exception {
259 this.passedZkCluster = false;
260 if (this.zkCluster != null) {
261 throw new IOException("Cluster already running at " + dir);
262 }
263 this.zkCluster = new MiniZooKeeperCluster(this.getConfiguration());
264 int clientPort = this.zkCluster.startup(dir);
265 this.conf.set("hbase.zookeeper.property.clientPort",
266 Integer.toString(clientPort));
267 return this.zkCluster;
268 }
269
270
271
272
273
274
275
276 public void shutdownMiniZKCluster() throws IOException {
277 if (this.zkCluster != null) {
278 this.zkCluster.shutdown();
279 this.zkCluster = null;
280 }
281 }
282
283
284
285
286
287
288
289 public MiniHBaseCluster startMiniCluster() throws Exception {
290 return startMiniCluster(1, 1);
291 }
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306 public MiniHBaseCluster startMiniCluster(final int numSlaves)
307 throws Exception {
308 return startMiniCluster(1, numSlaves);
309 }
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327 public MiniHBaseCluster startMiniCluster(final int numMasters,
328 final int numSlaves)
329 throws Exception {
330 LOG.info("Starting up minicluster with " + numMasters + " master(s) and " +
331 numSlaves + " regionserver(s) and datanode(s)");
332
333 String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
334 isRunningCluster(testBuildPath);
335 if (testBuildPath != null) {
336 LOG.info("Using passed path: " + testBuildPath);
337 }
338
339
340 this.clusterTestBuildDir = testBuildPath == null?
341 setupClusterTestBuildDir() : new File(testBuildPath);
342 System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath());
343
344
345 startMiniDFSCluster(numSlaves, this.clusterTestBuildDir);
346 this.dfsCluster.waitClusterUp();
347
348
349 if (this.zkCluster == null) {
350 startMiniZKCluster(this.clusterTestBuildDir);
351 }
352 return startMiniHBaseCluster(numMasters, numSlaves);
353 }
354
355
356
357
358
359
360
361
362
363
364
365
366 public MiniHBaseCluster startMiniHBaseCluster(final int numMasters,
367 final int numSlaves)
368 throws IOException, InterruptedException {
369
370 createRootDir();
371 Configuration c = new Configuration(this.conf);
372 this.hbaseCluster = new MiniHBaseCluster(c, numMasters, numSlaves);
373
374 HTable t = new HTable(c, HConstants.META_TABLE_NAME);
375 ResultScanner s = t.getScanner(new Scan());
376 while (s.next() != null) {
377 continue;
378 }
379 LOG.info("Minicluster is up");
380 return this.hbaseCluster;
381 }
382
383
384
385
386
387
388
389 public void restartHBaseCluster(int servers) throws IOException, InterruptedException {
390 this.hbaseCluster = new MiniHBaseCluster(this.conf, servers);
391
392 HTable t = new HTable(new Configuration(this.conf), HConstants.META_TABLE_NAME);
393 ResultScanner s = t.getScanner(new Scan());
394 while (s.next() != null) {
395 continue;
396 }
397 LOG.info("HBase has been restarted");
398 }
399
400
401
402
403
404
405 public MiniHBaseCluster getMiniHBaseCluster() {
406 return this.hbaseCluster;
407 }
408
409
410
411
412
413
414 public void shutdownMiniCluster() throws IOException {
415 LOG.info("Shutting down minicluster");
416 shutdownMiniHBaseCluster();
417 if (!this.passedZkCluster) shutdownMiniZKCluster();
418 if (this.dfsCluster != null) {
419
420 this.dfsCluster.shutdown();
421 }
422
423 if (this.clusterTestBuildDir != null && this.clusterTestBuildDir.exists()) {
424
425 if (!FSUtils.deleteDirectory(FileSystem.getLocal(this.conf),
426 new Path(this.clusterTestBuildDir.toString()))) {
427 LOG.warn("Failed delete of " + this.clusterTestBuildDir.toString());
428 }
429 this.clusterTestBuildDir = null;
430 }
431 LOG.info("Minicluster is down");
432 }
433
434
435
436
437
438 public void shutdownMiniHBaseCluster() throws IOException {
439 if (this.hbaseCluster != null) {
440 this.hbaseCluster.shutdown();
441
442 this.hbaseCluster.join();
443 }
444 this.hbaseCluster = null;
445 }
446
447
448
449
450
451
452
453
454
455 public Path createRootDir() throws IOException {
456 FileSystem fs = FileSystem.get(this.conf);
457 Path hbaseRootdir = fs.makeQualified(fs.getHomeDirectory());
458 this.conf.set(HConstants.HBASE_DIR, hbaseRootdir.toString());
459 fs.mkdirs(hbaseRootdir);
460 FSUtils.setVersion(fs, hbaseRootdir);
461 return hbaseRootdir;
462 }
463
464
465
466
467
468 public void flush() throws IOException {
469 this.hbaseCluster.flushcache();
470 }
471
472
473
474
475
476 public void flush(byte [] tableName) throws IOException {
477 this.hbaseCluster.flushcache(tableName);
478 }
479
480
481
482
483
484
485
486
487
488 public HTable createTable(byte[] tableName, byte[] family)
489 throws IOException{
490 return createTable(tableName, new byte[][]{family});
491 }
492
493
494
495
496
497
498
499
500 public HTable createTable(byte[] tableName, byte[][] families)
501 throws IOException {
502 return createTable(tableName, families,
503 new Configuration(getConfiguration()));
504 }
505
506
507
508
509
510
511
512
513
514 public HTable createTable(byte[] tableName, byte[][] families,
515 final Configuration c)
516 throws IOException {
517 HTableDescriptor desc = new HTableDescriptor(tableName);
518 for(byte[] family : families) {
519 desc.addFamily(new HColumnDescriptor(family));
520 }
521 getHBaseAdmin().createTable(desc);
522 return new HTable(c, tableName);
523 }
524
525
526
527
528
529
530
531
532
533 public HTable createTable(byte[] tableName, byte[] family, int numVersions)
534 throws IOException {
535 return createTable(tableName, new byte[][]{family}, numVersions);
536 }
537
538
539
540
541
542
543
544
545
546 public HTable createTable(byte[] tableName, byte[][] families,
547 int numVersions)
548 throws IOException {
549 HTableDescriptor desc = new HTableDescriptor(tableName);
550 for (byte[] family : families) {
551 HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions,
552 HColumnDescriptor.DEFAULT_COMPRESSION,
553 HColumnDescriptor.DEFAULT_IN_MEMORY,
554 HColumnDescriptor.DEFAULT_BLOCKCACHE,
555 Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL,
556 HColumnDescriptor.DEFAULT_BLOOMFILTER,
557 HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
558 desc.addFamily(hcd);
559 }
560 getHBaseAdmin().createTable(desc);
561 return new HTable(new Configuration(getConfiguration()), tableName);
562 }
563
564
565
566
567
568
569
570
571
572 public HTable createTable(byte[] tableName, byte[][] families,
573 int[] numVersions)
574 throws IOException {
575 HTableDescriptor desc = new HTableDescriptor(tableName);
576 int i = 0;
577 for (byte[] family : families) {
578 HColumnDescriptor hcd = new HColumnDescriptor(family, numVersions[i],
579 HColumnDescriptor.DEFAULT_COMPRESSION,
580 HColumnDescriptor.DEFAULT_IN_MEMORY,
581 HColumnDescriptor.DEFAULT_BLOCKCACHE,
582 Integer.MAX_VALUE, HColumnDescriptor.DEFAULT_TTL,
583 HColumnDescriptor.DEFAULT_BLOOMFILTER,
584 HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
585 desc.addFamily(hcd);
586 i++;
587 }
588 getHBaseAdmin().createTable(desc);
589 return new HTable(new Configuration(getConfiguration()), tableName);
590 }
591
592
593
594
595
596 public void deleteTable(byte[] tableName) throws IOException {
597 HBaseAdmin admin = new HBaseAdmin(getConfiguration());
598 admin.disableTable(tableName);
599 admin.deleteTable(tableName);
600 }
601
602
603
604
605
606
607
608 public HTable truncateTable(byte [] tableName) throws IOException {
609 HTable table = new HTable(getConfiguration(), tableName);
610 Scan scan = new Scan();
611 ResultScanner resScan = table.getScanner(scan);
612 for(Result res : resScan) {
613 Delete del = new Delete(res.getRow());
614 table.delete(del);
615 }
616 resScan = table.getScanner(scan);
617 return table;
618 }
619
620
621
622
623
624
625
626
627 public int loadTable(final HTable t, final byte[] f) throws IOException {
628 t.setAutoFlush(false);
629 byte[] k = new byte[3];
630 int rowCount = 0;
631 for (byte b1 = 'a'; b1 <= 'z'; b1++) {
632 for (byte b2 = 'a'; b2 <= 'z'; b2++) {
633 for (byte b3 = 'a'; b3 <= 'z'; b3++) {
634 k[0] = b1;
635 k[1] = b2;
636 k[2] = b3;
637 Put put = new Put(k);
638 put.add(f, null, k);
639 t.put(put);
640 rowCount++;
641 }
642 }
643 }
644 t.flushCommits();
645 return rowCount;
646 }
647
648
649
650
651
652
653
654 public int loadRegion(final HRegion r, final byte[] f)
655 throws IOException {
656 byte[] k = new byte[3];
657 int rowCount = 0;
658 for (byte b1 = 'a'; b1 <= 'z'; b1++) {
659 for (byte b2 = 'a'; b2 <= 'z'; b2++) {
660 for (byte b3 = 'a'; b3 <= 'z'; b3++) {
661 k[0] = b1;
662 k[1] = b2;
663 k[2] = b3;
664 Put put = new Put(k);
665 put.add(f, null, k);
666 if (r.getLog() == null) put.setWriteToWAL(false);
667 r.put(put);
668 rowCount++;
669 }
670 }
671 }
672 return rowCount;
673 }
674
675
676
677
678 public int countRows(final HTable table) throws IOException {
679 Scan scan = new Scan();
680 ResultScanner results = table.getScanner(scan);
681 int count = 0;
682 for (@SuppressWarnings("unused") Result res : results) {
683 count++;
684 }
685 results.close();
686 return count;
687 }
688
689
690
691
692 public String checksumRows(final HTable table) throws Exception {
693 Scan scan = new Scan();
694 ResultScanner results = table.getScanner(scan);
695 MessageDigest digest = MessageDigest.getInstance("MD5");
696 for (Result res : results) {
697 digest.update(res.getRow());
698 }
699 results.close();
700 return digest.toString();
701 }
702
703
704
705
706
707
708
709
710
711 public int createMultiRegions(HTable table, byte[] columnFamily)
712 throws IOException {
713 return createMultiRegions(getConfiguration(), table, columnFamily);
714 }
715
716 public static final byte[][] KEYS = {
717 HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
718 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
719 Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
720 Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
721 Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
722 Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
723 Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
724 Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
725 Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
726 };
727
728
729
730
731
732
733
734
735
736 public int createMultiRegions(final Configuration c, final HTable table,
737 final byte[] columnFamily)
738 throws IOException {
739 return createMultiRegions(c, table, columnFamily, KEYS);
740 }
741
742
743
744
745
746
747
748
749
750
751 public int createMultiRegions(final Configuration c, final HTable table,
752 final byte [] family, int numRegions)
753 throws IOException {
754 if (numRegions < 3) throw new IOException("Must create at least 3 regions");
755 byte [] startKey = Bytes.toBytes("aaaaa");
756 byte [] endKey = Bytes.toBytes("zzzzz");
757 byte [][] splitKeys = Bytes.split(startKey, endKey, numRegions - 3);
758 byte [][] regionStartKeys = new byte[splitKeys.length+1][];
759 for (int i=0;i<splitKeys.length;i++) {
760 regionStartKeys[i+1] = splitKeys[i];
761 }
762 regionStartKeys[0] = HConstants.EMPTY_BYTE_ARRAY;
763 return createMultiRegions(c, table, family, regionStartKeys);
764 }
765
766 public int createMultiRegions(final Configuration c, final HTable table,
767 final byte[] columnFamily, byte [][] startKeys)
768 throws IOException {
769 Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
770 HTable meta = new HTable(c, HConstants.META_TABLE_NAME);
771 HTableDescriptor htd = table.getTableDescriptor();
772 if(!htd.hasFamily(columnFamily)) {
773 HColumnDescriptor hcd = new HColumnDescriptor(columnFamily);
774 htd.addFamily(hcd);
775 }
776
777
778
779
780 List<byte[]> rows = getMetaTableRows(htd.getName());
781 List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length);
782
783 int count = 0;
784 for (int i = 0; i < startKeys.length; i++) {
785 int j = (i + 1) % startKeys.length;
786 HRegionInfo hri = new HRegionInfo(table.getTableDescriptor(),
787 startKeys[i], startKeys[j]);
788 Put put = new Put(hri.getRegionName());
789 put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
790 Writables.getBytes(hri));
791 meta.put(put);
792 LOG.info("createMultiRegions: inserted " + hri.toString());
793 newRegions.add(hri);
794 count++;
795 }
796
797 for (byte[] row : rows) {
798 LOG.info("createMultiRegions: deleting meta row -> " +
799 Bytes.toStringBinary(row));
800 meta.delete(new Delete(row));
801 }
802
803 HConnection conn = table.getConnection();
804 conn.clearRegionCache();
805
806 if (getHBaseAdmin().isTableEnabled(table.getTableName())) {
807 for(HRegionInfo hri : newRegions) {
808 hbaseCluster.getMaster().assignRegion(hri);
809 }
810 }
811 return count;
812 }
813
814
815
816
817
818
819
820
821
822
823
824 public List<HRegionInfo> createMultiRegionsInMeta(final Configuration conf,
825 final HTableDescriptor htd, byte [][] startKeys)
826 throws IOException {
827 HTable meta = new HTable(conf, HConstants.META_TABLE_NAME);
828 Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
829 List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length);
830
831 int count = 0;
832 for (int i = 0; i < startKeys.length; i++) {
833 int j = (i + 1) % startKeys.length;
834 HRegionInfo hri = new HRegionInfo(htd, startKeys[i], startKeys[j]);
835 Put put = new Put(hri.getRegionName());
836 put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
837 Writables.getBytes(hri));
838 meta.put(put);
839 LOG.info("createMultiRegionsInMeta: inserted " + hri.toString());
840 newRegions.add(hri);
841 count++;
842 }
843 return newRegions;
844 }
845
846
847
848
849
850
851 public List<byte[]> getMetaTableRows() throws IOException {
852
853 HTable t = new HTable(new Configuration(this.conf), HConstants.META_TABLE_NAME);
854 List<byte[]> rows = new ArrayList<byte[]>();
855 ResultScanner s = t.getScanner(new Scan());
856 for (Result result : s) {
857 LOG.info("getMetaTableRows: row -> " +
858 Bytes.toStringBinary(result.getRow()));
859 rows.add(result.getRow());
860 }
861 s.close();
862 return rows;
863 }
864
865
866
867
868
869
870 public List<byte[]> getMetaTableRows(byte[] tableName) throws IOException {
871
872 HTable t = new HTable(new Configuration(this.conf), HConstants.META_TABLE_NAME);
873 List<byte[]> rows = new ArrayList<byte[]>();
874 ResultScanner s = t.getScanner(new Scan());
875 for (Result result : s) {
876 HRegionInfo info = Writables.getHRegionInfo(
877 result.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER));
878 HTableDescriptor desc = info.getTableDesc();
879 if (Bytes.compareTo(desc.getName(), tableName) == 0) {
880 LOG.info("getMetaTableRows: row -> " +
881 Bytes.toStringBinary(result.getRow()));
882 rows.add(result.getRow());
883 }
884 }
885 s.close();
886 return rows;
887 }
888
889
890
891
892
893
894
895
896
897
898
899 public HRegionServer getRSForFirstRegionInTable(byte[] tableName)
900 throws IOException {
901 List<byte[]> metaRows = getMetaTableRows(tableName);
902 if (metaRows == null || metaRows.size() == 0) {
903 return null;
904 }
905 int index = hbaseCluster.getServerWith(metaRows.get(0));
906 return hbaseCluster.getRegionServerThreads().get(index).getRegionServer();
907 }
908
909
910
911
912
913
914
915 public void startMiniMapReduceCluster() throws IOException {
916 startMiniMapReduceCluster(2);
917 }
918
919
920
921
922
923
924
925 public void startMiniMapReduceCluster(final int servers) throws IOException {
926 LOG.info("Starting mini mapreduce cluster...");
927
928 Configuration c = getConfiguration();
929 System.setProperty("hadoop.log.dir", c.get("hadoop.log.dir"));
930 c.set("mapred.output.dir", c.get("hadoop.tmp.dir"));
931 mrCluster = new MiniMRCluster(servers,
932 FileSystem.get(c).getUri().toString(), 1);
933 LOG.info("Mini mapreduce cluster started");
934 c.set("mapred.job.tracker",
935 mrCluster.createJobConf().get("mapred.job.tracker"));
936 }
937
938
939
940
941 public void shutdownMiniMapReduceCluster() {
942 LOG.info("Stopping mini mapreduce cluster...");
943 if (mrCluster != null) {
944 mrCluster.shutdown();
945 }
946
947 conf.set("mapred.job.tracker", "local");
948 LOG.info("Mini mapreduce cluster stopped");
949 }
950
951
952
953
954
955
956 public void enableDebug(Class<?> clazz) {
957 Log l = LogFactory.getLog(clazz);
958 if (l instanceof Log4JLogger) {
959 ((Log4JLogger) l).getLogger().setLevel(org.apache.log4j.Level.DEBUG);
960 } else if (l instanceof Jdk14Logger) {
961 ((Jdk14Logger) l).getLogger().setLevel(java.util.logging.Level.ALL);
962 }
963 }
964
965
966
967
968
969 public void expireMasterSession() throws Exception {
970 HMaster master = hbaseCluster.getMaster();
971 expireSession(master.getZooKeeper(), master);
972 }
973
974
975
976
977
978
979 public void expireRegionServerSession(int index) throws Exception {
980 HRegionServer rs = hbaseCluster.getRegionServer(index);
981 expireSession(rs.getZooKeeper(), rs);
982 }
983
984 public void expireSession(ZooKeeperWatcher nodeZK, Server server)
985 throws Exception {
986 Configuration c = new Configuration(this.conf);
987 String quorumServers = ZKConfig.getZKQuorumServersString(c);
988 int sessionTimeout = 5 * 1000;
989 ZooKeeper zk = nodeZK.getZooKeeper();
990 byte[] password = zk.getSessionPasswd();
991 long sessionID = zk.getSessionId();
992
993 ZooKeeper newZK = new ZooKeeper(quorumServers,
994 sessionTimeout, EmptyWatcher.instance, sessionID, password);
995 newZK.close();
996 final long sleep = sessionTimeout * 5L;
997 LOG.info("ZK Closed Session 0x" + Long.toHexString(sessionID) +
998 "; sleeping=" + sleep);
999
1000 Thread.sleep(sleep);
1001
1002 new HTable(new Configuration(conf), HConstants.META_TABLE_NAME);
1003 }
1004
1005
1006
1007
1008
1009
1010 public MiniHBaseCluster getHBaseCluster() {
1011 return hbaseCluster;
1012 }
1013
1014
1015
1016
1017
1018
1019
1020 public HBaseAdmin getHBaseAdmin()
1021 throws IOException {
1022 return new HBaseAdmin(new Configuration(getConfiguration()));
1023 }
1024
1025
1026
1027
1028
1029
1030
1031 public void closeRegion(String regionName) throws IOException {
1032 closeRegion(Bytes.toBytes(regionName));
1033 }
1034
1035
1036
1037
1038
1039
1040
1041 public void closeRegion(byte[] regionName) throws IOException {
1042 HBaseAdmin admin = getHBaseAdmin();
1043 admin.closeRegion(regionName, null);
1044 }
1045
1046
1047
1048
1049
1050
1051
1052
1053 public void closeRegionByRow(String row, HTable table) throws IOException {
1054 closeRegionByRow(Bytes.toBytes(row), table);
1055 }
1056
1057
1058
1059
1060
1061
1062
1063
1064 public void closeRegionByRow(byte[] row, HTable table) throws IOException {
1065 HRegionLocation hrl = table.getRegionLocation(row);
1066 closeRegion(hrl.getRegionInfo().getRegionName());
1067 }
1068
1069 public MiniZooKeeperCluster getZkCluster() {
1070 return zkCluster;
1071 }
1072
1073 public void setZkCluster(MiniZooKeeperCluster zkCluster) {
1074 this.passedZkCluster = true;
1075 this.zkCluster = zkCluster;
1076 }
1077
1078 public MiniDFSCluster getDFSCluster() {
1079 return dfsCluster;
1080 }
1081
1082 public FileSystem getTestFileSystem() throws IOException {
1083 return FileSystem.get(conf);
1084 }
1085
1086
1087
1088
1089
1090 public boolean cleanupTestDir() throws IOException {
1091 return deleteDir(getTestDir());
1092 }
1093
1094
1095
1096
1097
1098
1099 public boolean cleanupTestDir(final String subdir) throws IOException {
1100 return deleteDir(getTestDir(subdir));
1101 }
1102
1103
1104
1105
1106
1107
1108 public boolean deleteDir(final Path dir) throws IOException {
1109 FileSystem fs = getTestFileSystem();
1110 if (fs.exists(dir)) {
1111 return fs.delete(getTestDir(), true);
1112 }
1113 return false;
1114 }
1115
1116 public void waitTableAvailable(byte[] table, long timeoutMillis)
1117 throws InterruptedException, IOException {
1118 HBaseAdmin admin = getHBaseAdmin();
1119 long startWait = System.currentTimeMillis();
1120 while (!admin.isTableAvailable(table)) {
1121 assertTrue("Timed out waiting for table " + Bytes.toStringBinary(table),
1122 System.currentTimeMillis() - startWait < timeoutMillis);
1123 Thread.sleep(500);
1124 }
1125 }
1126
1127
1128
1129
1130
1131
1132
1133
1134 public boolean ensureSomeRegionServersAvailable(final int num)
1135 throws IOException {
1136 if (this.getHBaseCluster().getLiveRegionServerThreads().size() < num) {
1137
1138 LOG.info("Started new server=" +
1139 this.getHBaseCluster().startRegionServer());
1140 return true;
1141 }
1142 return false;
1143 }
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154 public static User getDifferentUser(final Configuration c,
1155 final String differentiatingSuffix)
1156 throws IOException {
1157 FileSystem currentfs = FileSystem.get(c);
1158 if (!(currentfs instanceof DistributedFileSystem)) {
1159 return User.getCurrent();
1160 }
1161
1162
1163 String username = User.getCurrent().getName() +
1164 differentiatingSuffix;
1165 User user = User.createUserForTesting(c, username,
1166 new String[]{"supergroup"});
1167 return user;
1168 }
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180 public void setNameNodeNameSystemLeasePeriod(final int soft, final int hard)
1181 throws SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException {
1182
1183
1184
1185
1186 Field field = this.dfsCluster.getClass().getDeclaredField("nameNode");
1187 field.setAccessible(true);
1188 NameNode nn = (NameNode)field.get(this.dfsCluster);
1189 nn.namesystem.leaseManager.setLeasePeriod(100, 50000);
1190 }
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205 public static void setMaxRecoveryErrorCount(final OutputStream stream,
1206 final int max) {
1207 try {
1208 Class<?> [] clazzes = DFSClient.class.getDeclaredClasses();
1209 for (Class<?> clazz: clazzes) {
1210 String className = clazz.getSimpleName();
1211 if (className.equals("DFSOutputStream")) {
1212 if (clazz.isInstance(stream)) {
1213 Field maxRecoveryErrorCountField =
1214 stream.getClass().getDeclaredField("maxRecoveryErrorCount");
1215 maxRecoveryErrorCountField.setAccessible(true);
1216 maxRecoveryErrorCountField.setInt(stream, max);
1217 break;
1218 }
1219 }
1220 }
1221 } catch (Exception e) {
1222 LOG.info("Could not set max recovery field", e);
1223 }
1224 }
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235 public void waitUntilAllRegionsAssigned(final int countOfRegions)
1236 throws IOException {
1237 HTable meta = new HTable(getConfiguration(), HConstants.META_TABLE_NAME);
1238 while (true) {
1239 int rows = 0;
1240 Scan scan = new Scan();
1241 scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
1242 ResultScanner s = meta.getScanner(scan);
1243 for (Result r = null; (r = s.next()) != null;) {
1244 byte [] b =
1245 r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
1246 if (b == null || b.length <= 0) {
1247 break;
1248 }
1249 rows++;
1250 }
1251 s.close();
1252
1253 if (rows == countOfRegions) {
1254 break;
1255 }
1256 LOG.info("Found=" + rows);
1257 Threads.sleep(1000);
1258 }
1259 }
1260
1261
1262
1263
1264
1265 public static List<KeyValue> getFromStoreFile(Store store,
1266 Get get) throws IOException {
1267 ReadWriteConsistencyControl.resetThreadReadPoint();
1268 Scan scan = new Scan(get);
1269 InternalScanner scanner = (InternalScanner) store.getScanner(scan,
1270 scan.getFamilyMap().get(store.getFamily().getName()));
1271
1272 List<KeyValue> result = new ArrayList<KeyValue>();
1273 scanner.next(result);
1274 if (!result.isEmpty()) {
1275
1276 KeyValue kv = result.get(0);
1277 if (!Bytes.equals(kv.getRow(), get.getRow())) {
1278 result.clear();
1279 }
1280 }
1281 return result;
1282 }
1283
1284
1285
1286
1287
1288 public static List<KeyValue> getFromStoreFile(Store store,
1289 byte [] row,
1290 NavigableSet<byte[]> columns
1291 ) throws IOException {
1292 Get get = new Get(row);
1293 Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap();
1294 s.put(store.getFamily().getName(), columns);
1295
1296 return getFromStoreFile(store,get);
1297 }
1298 }