1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.util;
21
22 import java.io.IOException;
23 import java.math.BigInteger;
24 import java.util.LinkedList;
25 import java.util.Set;
26 import java.util.TreeMap;
27
28 import org.apache.commons.cli.CommandLine;
29 import org.apache.commons.cli.GnuParser;
30 import org.apache.commons.cli.HelpFormatter;
31 import org.apache.commons.cli.OptionBuilder;
32 import org.apache.commons.cli.Options;
33 import org.apache.commons.cli.ParseException;
34 import org.apache.commons.lang.StringUtils;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.fs.FSDataInputStream;
39 import org.apache.hadoop.fs.FSDataOutputStream;
40 import org.apache.hadoop.fs.FileStatus;
41 import org.apache.hadoop.fs.FileSystem;
42 import org.apache.hadoop.fs.Path;
43 import org.apache.hadoop.hbase.HBaseConfiguration;
44 import org.apache.hadoop.hbase.HColumnDescriptor;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.HRegionInfo;
47 import org.apache.hadoop.hbase.HRegionLocation;
48 import org.apache.hadoop.hbase.HServerAddress;
49 import org.apache.hadoop.hbase.HTableDescriptor;
50 import org.apache.hadoop.hbase.client.HBaseAdmin;
51 import org.apache.hadoop.hbase.client.HTable;
52 import org.apache.hadoop.hbase.client.NoServerForRegionException;
53 import org.apache.hadoop.hbase.regionserver.Store;
54 import org.apache.hadoop.hbase.regionserver.StoreFile;
55
56 import com.google.common.base.Preconditions;
57 import com.google.common.collect.Lists;
58 import com.google.common.collect.Maps;
59 import com.google.common.collect.Sets;
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131 public class RegionSplitter {
132 static final Log LOG = LogFactory.getLog(RegionSplitter.class);
133
134
135
136
137
138
139
140
141
142 public static interface SplitAlgorithm {
143
144
145
146
147
148
149
150
151
152 byte[] split(byte[] start, byte[] end);
153
154
155
156
157
158
159
160
161
162 byte[][] split(int numberOfSplits);
163
164
165
166
167
168
169
170
171 byte[] firstRow();
172
173
174
175
176
177
178
179
180 byte[] lastRow();
181
182
183
184
185
186
187 byte[] strToRow(String input);
188
189
190
191
192
193
194 String rowToStr(byte[] row);
195
196
197
198
199 String separator();
200 }
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229 @SuppressWarnings("static-access")
230 public static void main(String[] args) throws IOException,
231 InterruptedException, ParseException {
232 Configuration conf = HBaseConfiguration.create();
233
234
235 Options opt = new Options();
236 opt.addOption(OptionBuilder.withArgName("property=value").hasArg()
237 .withDescription("Override HBase Configuration Settings").create("D"));
238 opt.addOption(OptionBuilder.withArgName("region count").hasArg()
239 .withDescription(
240 "Create a new table with a pre-split number of regions")
241 .create("c"));
242 opt.addOption(OptionBuilder.withArgName("family:family:...").hasArg()
243 .withDescription(
244 "Column Families to create with new table. Required with -c")
245 .create("f"));
246 opt.addOption("h", false, "Print this usage help");
247 opt.addOption("r", false, "Perform a rolling split of an existing region");
248 opt.addOption(OptionBuilder.withArgName("count").hasArg().withDescription(
249 "Max outstanding splits that have unfinished major compactions")
250 .create("o"));
251 opt.addOption(null, "risky", false,
252 "Skip verification steps to complete quickly."
253 + "STRONGLY DISCOURAGED for production systems. ");
254 CommandLine cmd = new GnuParser().parse(opt, args);
255
256 if (cmd.hasOption("D")) {
257 for (String confOpt : cmd.getOptionValues("D")) {
258 String[] kv = confOpt.split("=", 2);
259 if (kv.length == 2) {
260 conf.set(kv[0], kv[1]);
261 LOG.debug("-D configuration override: " + kv[0] + "=" + kv[1]);
262 } else {
263 throw new ParseException("-D option format invalid: " + confOpt);
264 }
265 }
266 }
267
268 if (cmd.hasOption("risky")) {
269 conf.setBoolean("split.verify", false);
270 }
271
272 boolean createTable = cmd.hasOption("c") && cmd.hasOption("f");
273 boolean rollingSplit = cmd.hasOption("r");
274 boolean oneOperOnly = createTable ^ rollingSplit;
275
276 if (1 != cmd.getArgList().size() || !oneOperOnly || cmd.hasOption("h")) {
277 new HelpFormatter().printHelp("RegionSplitter <TABLE>", opt);
278 return;
279 }
280 String tableName = cmd.getArgs()[0];
281
282 if (createTable) {
283 conf.set("split.count", cmd.getOptionValue("c"));
284 createPresplitTable(tableName, cmd.getOptionValue("f").split(":"), conf);
285 }
286
287 if (rollingSplit) {
288 if (cmd.hasOption("o")) {
289 conf.set("split.outstanding", cmd.getOptionValue("o"));
290 }
291 rollingSplit(tableName, conf);
292 }
293 }
294
295 static void createPresplitTable(String tableName, String[] columnFamilies,
296 Configuration conf) throws IOException, InterruptedException {
297 Class<? extends SplitAlgorithm> splitClass = conf.getClass(
298 "split.algorithm", MD5StringSplit.class, SplitAlgorithm.class);
299 SplitAlgorithm splitAlgo;
300 try {
301 splitAlgo = splitClass.newInstance();
302 } catch (Exception e) {
303 throw new IOException("Problem loading split algorithm: ", e);
304 }
305 final int splitCount = conf.getInt("split.count", 0);
306 Preconditions.checkArgument(splitCount > 1, "Split count must be > 1");
307
308 Preconditions.checkArgument(columnFamilies.length > 0,
309 "Must specify at least one column family. ");
310 LOG.debug("Creating table " + tableName + " with " + columnFamilies.length
311 + " column families. Presplitting to " + splitCount + " regions");
312
313 HTableDescriptor desc = new HTableDescriptor(tableName);
314 for (String cf : columnFamilies) {
315 desc.addFamily(new HColumnDescriptor(Bytes.toBytes(cf)));
316 }
317 HBaseAdmin admin = new HBaseAdmin(conf);
318 Preconditions.checkArgument(!admin.tableExists(tableName),
319 "Table already exists: " + tableName);
320 admin.createTable(desc, splitAlgo.split(splitCount));
321 LOG.debug("Table created! Waiting for regions to show online in META...");
322
323 if (!conf.getBoolean("split.verify", true)) {
324
325 HTable table = new HTable(tableName);
326 int onlineRegions = 0;
327 while (onlineRegions < splitCount) {
328 onlineRegions = table.getRegionsInfo().size();
329 LOG.debug(onlineRegions + " of " + splitCount + " regions online...");
330 if (onlineRegions < splitCount) {
331 Thread.sleep(10 * 1000);
332 }
333 }
334 }
335
336 LOG.debug("Finished creating table with " + splitCount + " regions");
337 }
338
339 static void rollingSplit(String tableName, Configuration conf)
340 throws IOException, InterruptedException {
341 Class<? extends SplitAlgorithm> splitClass = conf.getClass(
342 "split.algorithm", MD5StringSplit.class, SplitAlgorithm.class);
343 SplitAlgorithm splitAlgo;
344 try {
345 splitAlgo = splitClass.newInstance();
346 } catch (Exception e) {
347 throw new IOException("Problem loading split algorithm: ", e);
348 }
349 final int minOS = conf.getInt("split.outstanding", 2);
350
351 HTable table = new HTable(conf, tableName);
352
353
354 final int MAX_OUTSTANDING = Math.max(table.getCurrentNrHRS() / 2, minOS);
355
356 Path hbDir = new Path(conf.get(HConstants.HBASE_DIR));
357 Path tableDir = HTableDescriptor.getTableDir(hbDir, table.getTableName());
358 Path splitFile = new Path(tableDir, "_balancedSplit");
359 FileSystem fs = FileSystem.get(conf);
360
361
362 LinkedList<Pair<byte[], byte[]>> tmpRegionSet = getSplits(table, splitAlgo);
363 LinkedList<Pair<byte[], byte[]>> outstanding = Lists.newLinkedList();
364 int splitCount = 0;
365 final int origCount = tmpRegionSet.size();
366
367
368
369
370 LOG.debug("Bucketing regions by regionserver...");
371 TreeMap<HServerAddress, LinkedList<Pair<byte[], byte[]>>> daughterRegions = Maps
372 .newTreeMap();
373 for (Pair<byte[], byte[]> dr : tmpRegionSet) {
374 HServerAddress rsLocation = table.getRegionLocation(dr.getSecond())
375 .getServerAddress();
376 if (!daughterRegions.containsKey(rsLocation)) {
377 LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
378 daughterRegions.put(rsLocation, entry);
379 }
380 daughterRegions.get(rsLocation).add(dr);
381 }
382 LOG.debug("Done with bucketing. Split time!");
383 long startTime = System.currentTimeMillis();
384
385
386 FSDataInputStream tmpIn = fs.open(splitFile);
387 byte[] rawData = new byte[tmpIn.available()];
388 tmpIn.readFully(rawData);
389 tmpIn.close();
390 FSDataOutputStream splitOut = fs.create(splitFile);
391 splitOut.write(rawData);
392
393 try {
394
395 while (!daughterRegions.isEmpty()) {
396 LOG.debug(daughterRegions.size() + " RS have regions to splt.");
397
398
399 for (HServerAddress rsLoc = daughterRegions.firstKey();
400 rsLoc != null;
401 rsLoc = daughterRegions.higherKey(rsLoc)) {
402 Pair<byte[], byte[]> dr = null;
403
404
405 LOG.debug("Finding a region on " + rsLoc);
406 LinkedList<Pair<byte[], byte[]>> regionList = daughterRegions
407 .get(rsLoc);
408 while (!regionList.isEmpty()) {
409 dr = regionList.pop();
410
411
412 byte[] split = dr.getSecond();
413 HRegionLocation regionLoc = table.getRegionLocation(split);
414
415
416 HServerAddress newRs = regionLoc.getServerAddress();
417 if (newRs.compareTo(rsLoc) != 0) {
418 LOG.debug("Region with " + splitAlgo.rowToStr(split)
419 + " moved to " + newRs + ". Relocating...");
420
421 if (!daughterRegions.containsKey(newRs)) {
422 LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
423 daughterRegions.put(newRs, entry);
424 }
425 daughterRegions.get(newRs).add(dr);
426 dr = null;
427 continue;
428 }
429
430
431 byte[] sk = regionLoc.getRegionInfo().getStartKey();
432 if (sk.length != 0) {
433 if (Bytes.equals(split, sk)) {
434 LOG.debug("Region already split on "
435 + splitAlgo.rowToStr(split) + ". Skipping this region...");
436 ++splitCount;
437 dr = null;
438 continue;
439 }
440 byte[] start = dr.getFirst();
441 Preconditions.checkArgument(Bytes.equals(start, sk), splitAlgo
442 .rowToStr(start) + " != " + splitAlgo.rowToStr(sk));
443 }
444
445
446 break;
447 }
448 if (regionList.isEmpty()) {
449 daughterRegions.remove(rsLoc);
450 }
451 if (dr == null)
452 continue;
453
454
455 byte[] split = dr.getSecond();
456 LOG.debug("Splitting at " + splitAlgo.rowToStr(split));
457 HBaseAdmin admin = new HBaseAdmin(table.getConfiguration());
458 admin.split(table.getTableName(), split);
459
460 LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
461 if (conf.getBoolean("split.verify", true)) {
462
463 outstanding.addLast(dr);
464
465 while (outstanding.size() >= MAX_OUTSTANDING) {
466 finished = splitScan(outstanding, table, splitAlgo);
467 if (finished.isEmpty()) {
468 Thread.sleep(30 * 1000);
469 } else {
470 outstanding.removeAll(finished);
471 }
472 }
473 } else {
474 finished.add(dr);
475 }
476
477
478 for (Pair<byte[], byte[]> region : finished) {
479 splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
480 + " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
481 splitCount++;
482 if (splitCount % 10 == 0) {
483 long tDiff = (System.currentTimeMillis() - startTime)
484 / splitCount;
485 LOG.debug("STATUS UPDATE: " + splitCount + " / " + origCount
486 + ". Avg Time / Split = "
487 + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
488 }
489 }
490 }
491 }
492 if (conf.getBoolean("split.verify", true)) {
493 while (!outstanding.isEmpty()) {
494 LinkedList<Pair<byte[], byte[]>> finished = splitScan(outstanding,
495 table, splitAlgo);
496 if (finished.isEmpty()) {
497 Thread.sleep(30 * 1000);
498 } else {
499 outstanding.removeAll(finished);
500 for (Pair<byte[], byte[]> region : finished) {
501 splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
502 + " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
503 }
504 }
505 }
506 }
507 LOG.debug("All regions have been sucesfully split!");
508 } finally {
509 long tDiff = System.currentTimeMillis() - startTime;
510 LOG.debug("TOTAL TIME = "
511 + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
512 LOG.debug("Splits = " + splitCount);
513 LOG.debug("Avg Time / Split = "
514 + org.apache.hadoop.util.StringUtils.formatTime(tDiff / splitCount));
515
516 splitOut.close();
517 }
518 fs.delete(splitFile, false);
519 }
520
521 static LinkedList<Pair<byte[], byte[]>> splitScan(
522 LinkedList<Pair<byte[], byte[]>> regionList, HTable table,
523 SplitAlgorithm splitAlgo)
524 throws IOException, InterruptedException {
525 LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
526 LinkedList<Pair<byte[], byte[]>> logicalSplitting = Lists.newLinkedList();
527 LinkedList<Pair<byte[], byte[]>> physicalSplitting = Lists.newLinkedList();
528
529
530 Path hbDir = new Path(table.getConfiguration().get(HConstants.HBASE_DIR));
531 Path tableDir = HTableDescriptor.getTableDir(hbDir, table.getTableName());
532 Path splitFile = new Path(tableDir, "_balancedSplit");
533 FileSystem fs = FileSystem.get(table.getConfiguration());
534
535
536 table.clearRegionCache();
537
538
539 for (Pair<byte[], byte[]> region : regionList) {
540 byte[] start = region.getFirst();
541 byte[] split = region.getSecond();
542
543
544 HRegionInfo dri = table.getRegionLocation(split).getRegionInfo();
545 if (dri.isOffline() || !Bytes.equals(dri.getStartKey(), split)) {
546 logicalSplitting.add(region);
547 continue;
548 }
549
550 try {
551
552
553 LinkedList<HRegionInfo> check = Lists.newLinkedList();
554 check.add(table.getRegionLocation(start).getRegionInfo());
555 check.add(table.getRegionLocation(split).getRegionInfo());
556 for (HRegionInfo hri : check.toArray(new HRegionInfo[] {})) {
557 boolean refFound = false;
558 byte[] sk = hri.getStartKey();
559 if (sk.length == 0)
560 sk = splitAlgo.firstRow();
561 String startKey = splitAlgo.rowToStr(sk);
562
563 for (HColumnDescriptor c : hri.getTableDesc().getFamilies()) {
564 Path cfDir = Store.getStoreHomedir(tableDir, hri.getEncodedName(),
565 c.getName());
566 if (fs.exists(cfDir)) {
567 for (FileStatus file : fs.listStatus(cfDir)) {
568 refFound |= StoreFile.isReference(file.getPath());
569 if (refFound)
570 break;
571 }
572 }
573 if (refFound)
574 break;
575 }
576
577 if (!refFound) {
578 check.remove(hri);
579 }
580 }
581 if (check.isEmpty()) {
582 finished.add(region);
583 } else {
584 physicalSplitting.add(region);
585 }
586 } catch (NoServerForRegionException nsfre) {
587 LOG.debug("No Server Exception thrown for: "
588 + splitAlgo.rowToStr(start));
589 physicalSplitting.add(region);
590 table.clearRegionCache();
591 }
592 }
593
594 LOG.debug("Split Scan: " + finished.size() + " finished / "
595 + logicalSplitting.size() + " split wait / "
596 + physicalSplitting.size() + " reference wait");
597
598 return finished;
599 }
600
601 static LinkedList<Pair<byte[], byte[]>> getSplits(HTable table,
602 SplitAlgorithm splitAlgo) throws IOException {
603 Path hbDir = new Path(table.getConfiguration().get(HConstants.HBASE_DIR));
604 Path tableDir = HTableDescriptor.getTableDir(hbDir, table.getTableName());
605 Path splitFile = new Path(tableDir, "_balancedSplit");
606 FileSystem fs = FileSystem.get(table.getConfiguration());
607
608
609 Set<Pair<String, String>> daughterRegions = Sets.newHashSet();
610
611
612 if (!fs.exists(splitFile)) {
613
614 LOG.debug("No _balancedSplit file. Calculating splits...");
615
616
617 Set<Pair<byte[], byte[]>> rows = Sets.newHashSet();
618 Pair<byte[][], byte[][]> tmp = table.getStartEndKeys();
619 Preconditions.checkArgument(
620 tmp.getFirst().length == tmp.getSecond().length,
621 "Start and End rows should be equivalent");
622 for (int i = 0; i < tmp.getFirst().length; ++i) {
623 byte[] start = tmp.getFirst()[i], end = tmp.getSecond()[i];
624 if (start.length == 0)
625 start = splitAlgo.firstRow();
626 if (end.length == 0)
627 end = splitAlgo.lastRow();
628 rows.add(Pair.newPair(start, end));
629 }
630 LOG.debug("Table " + Bytes.toString(table.getTableName()) + " has "
631 + rows.size() + " regions that will be split.");
632
633
634 Path tmpFile = new Path(tableDir, "_balancedSplit_prepare");
635 FSDataOutputStream tmpOut = fs.create(tmpFile);
636
637
638 for (Pair<byte[], byte[]> r : rows) {
639 byte[] splitPoint = splitAlgo.split(r.getFirst(), r.getSecond());
640 String startStr = splitAlgo.rowToStr(r.getFirst());
641 String splitStr = splitAlgo.rowToStr(splitPoint);
642 daughterRegions.add(Pair.newPair(startStr, splitStr));
643 LOG.debug("Will Split [" + startStr + " , "
644 + splitAlgo.rowToStr(r.getSecond()) + ") at " + splitStr);
645 tmpOut.writeChars("+ " + startStr + splitAlgo.separator() + splitStr
646 + "\n");
647 }
648 tmpOut.close();
649 fs.rename(tmpFile, splitFile);
650 } else {
651 LOG.debug("_balancedSplit file found. Replay log to restore state...");
652 FSUtils.recoverFileLease(fs, splitFile, table.getConfiguration());
653
654
655 FSDataInputStream tmpIn = fs.open(splitFile);
656 StringBuilder sb = new StringBuilder(tmpIn.available());
657 while (tmpIn.available() > 0) {
658 sb.append(tmpIn.readChar());
659 }
660 tmpIn.close();
661 for (String line : sb.toString().split("\n")) {
662 String[] cmd = line.split(splitAlgo.separator());
663 Preconditions.checkArgument(3 == cmd.length);
664 byte[] start = splitAlgo.strToRow(cmd[1]);
665 String startStr = splitAlgo.rowToStr(start);
666 byte[] splitPoint = splitAlgo.strToRow(cmd[2]);
667 String splitStr = splitAlgo.rowToStr(splitPoint);
668 Pair<String, String> r = Pair.newPair(startStr, splitStr);
669 if (cmd[0].equals("+")) {
670 LOG.debug("Adding: " + r);
671 daughterRegions.add(r);
672 } else {
673 LOG.debug("Removing: " + r);
674 Preconditions.checkArgument(cmd[0].equals("-"),
675 "Unknown option: " + cmd[0]);
676 Preconditions.checkState(daughterRegions.contains(r),
677 "Missing row: " + r);
678 daughterRegions.remove(r);
679 }
680 }
681 LOG.debug("Done reading. " + daughterRegions.size() + " regions left.");
682 }
683 LinkedList<Pair<byte[], byte[]>> ret = Lists.newLinkedList();
684 for (Pair<String, String> r : daughterRegions) {
685 ret.add(Pair.newPair(splitAlgo.strToRow(r.getFirst()), splitAlgo
686 .strToRow(r.getSecond())));
687 }
688 return ret;
689 }
690
691
692
693
694
695
696
697
698 public static class MD5StringSplit implements SplitAlgorithm {
699 final static String MAXMD5 = "7FFFFFFF";
700 final static BigInteger MAXMD5_INT = new BigInteger(MAXMD5, 16);
701 final static int rowComparisonLength = MAXMD5.length();
702
703 public byte[] split(byte[] start, byte[] end) {
704 BigInteger s = convertToBigInteger(start);
705 BigInteger e = convertToBigInteger(end);
706 Preconditions.checkArgument(!e.equals(BigInteger.ZERO));
707 return convertToByte(split2(s, e));
708 }
709
710 public byte[][] split(int n) {
711 BigInteger[] splits = new BigInteger[n - 1];
712 BigInteger sizeOfEachSplit = MAXMD5_INT.divide(BigInteger.valueOf(n));
713 for (int i = 1; i < n; i++) {
714
715
716 splits[i - 1] = sizeOfEachSplit.multiply(BigInteger.valueOf(i));
717 }
718 return convertToBytes(splits);
719 }
720
721 public byte[] firstRow() {
722 return convertToByte(BigInteger.ZERO);
723 }
724
725 public byte[] lastRow() {
726 return convertToByte(MAXMD5_INT);
727 }
728
729 public byte[] strToRow(String in) {
730 return convertToByte(new BigInteger(in, 16));
731 }
732
733 public String rowToStr(byte[] row) {
734 return Bytes.toStringBinary(row);
735 }
736
737 public String separator() {
738 return " ";
739 }
740
741 static BigInteger split2(BigInteger minValue, BigInteger maxValue) {
742 return maxValue.add(minValue).divide(BigInteger.valueOf(2));
743 }
744
745
746
747
748
749
750
751 static byte[][] convertToBytes(BigInteger[] bigIntegers) {
752 byte[][] returnBytes = new byte[bigIntegers.length][];
753 for (int i = 0; i < bigIntegers.length; i++) {
754 returnBytes[i] = convertToByte(bigIntegers[i]);
755 }
756 return returnBytes;
757 }
758
759
760
761
762
763
764
765 static byte[] convertToByte(BigInteger bigInteger) {
766 String bigIntegerString = bigInteger.toString(16);
767 bigIntegerString = StringUtils.leftPad(bigIntegerString,
768 rowComparisonLength, '0');
769 return Bytes.toBytes(bigIntegerString);
770 }
771
772
773
774
775
776
777
778 static BigInteger convertToBigInteger(byte[] row) {
779 return (row.length > 0) ? new BigInteger(Bytes.toString(row), 16)
780 : BigInteger.ZERO;
781 }
782 }
783
784 }