View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.util;
21  
22  import java.io.IOException;
23  import java.math.BigInteger;
24  import java.util.LinkedList;
25  import java.util.Set;
26  import java.util.TreeMap;
27  
28  import org.apache.commons.cli.CommandLine;
29  import org.apache.commons.cli.GnuParser;
30  import org.apache.commons.cli.HelpFormatter;
31  import org.apache.commons.cli.OptionBuilder;
32  import org.apache.commons.cli.Options;
33  import org.apache.commons.cli.ParseException;
34  import org.apache.commons.lang.StringUtils;
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FSDataInputStream;
39  import org.apache.hadoop.fs.FSDataOutputStream;
40  import org.apache.hadoop.fs.FileStatus;
41  import org.apache.hadoop.fs.FileSystem;
42  import org.apache.hadoop.fs.Path;
43  import org.apache.hadoop.hbase.HBaseConfiguration;
44  import org.apache.hadoop.hbase.HColumnDescriptor;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.HRegionInfo;
47  import org.apache.hadoop.hbase.HRegionLocation;
48  import org.apache.hadoop.hbase.HServerAddress;
49  import org.apache.hadoop.hbase.HTableDescriptor;
50  import org.apache.hadoop.hbase.client.HBaseAdmin;
51  import org.apache.hadoop.hbase.client.HTable;
52  import org.apache.hadoop.hbase.client.NoServerForRegionException;
53  import org.apache.hadoop.hbase.regionserver.Store;
54  import org.apache.hadoop.hbase.regionserver.StoreFile;
55  
56  import com.google.common.base.Preconditions;
57  import com.google.common.collect.Lists;
58  import com.google.common.collect.Maps;
59  import com.google.common.collect.Sets;
60  
61  /**
62   * The {@link RegionSplitter} class provides several utilities to help in the
63   * administration lifecycle for developers who choose to manually split regions
64   * instead of having HBase handle that automatically. The most useful utilities
65   * are:
66   * <p>
67   * <ul>
68   * <li>Create a table with a specified number of pre-split regions
69   * <li>Execute a rolling split of all regions on an existing table
70   * </ul>
71   * <p>
72   * Both operations can be safely done on a live server.
73   * <p>
74   * <b>Question:</b> How do I turn off automatic splitting? <br>
75   * <b>Answer:</b> Automatic splitting is determined by the configuration value
76   * <i>"hbase.hregion.max.filesize"</i>. It is not recommended that you set this
77   * to Long.MAX_VALUE in case you forget about manual splits. A suggested setting
78   * is 100GB, which would result in > 1hr major compactions if reached.
79   * <p>
80   * <b>Question:</b> Why did the original authors decide to manually split? <br>
81   * <b>Answer:</b> Specific workload characteristics of our use case allowed us
82   * to benefit from a manual split system.
83   * <p>
84   * <ul>
85   * <li>Data (~1k) that would grow instead of being replaced
86   * <li>Data growth was roughly uniform across all regions
87   * <li>OLTP workload. Data loss is a big deal.
88   * </ul>
89   * <p>
90   * <b>Question:</b> Why is manual splitting good for this workload? <br>
91   * <b>Answer:</b> Although automated splitting is not a bad option, there are
92   * benefits to manual splitting.
93   * <p>
94   * <ul>
95   * <li>With growing amounts of data, splits will continually be needed. Since
96   * you always know exactly what regions you have, long-term debugging and
97   * profiling is much easier with manual splits. It is hard to trace the logs to
98   * understand region level problems if it keeps splitting and getting renamed.
99   * <li>Data offlining bugs + unknown number of split regions == oh crap! If an
100  * HLog or StoreFile was mistakenly unprocessed by HBase due to a weird bug and
101  * you notice it a day or so later, you can be assured that the regions
102  * specified in these files are the same as the current regions and you have
103  * less headaches trying to restore/replay your data.
104  * <li>You can finely tune your compaction algorithm. With roughly uniform data
105  * growth, it's easy to cause split / compaction storms as the regions all
106  * roughly hit the same data size at the same time. With manual splits, you can
107  * let staggered, time-based major compactions spread out your network IO load.
108  * </ul>
109  * <p>
110  * <b>Question:</b> What's the optimal number of pre-split regions to create? <br>
111  * <b>Answer:</b> Mileage will vary depending upon your application.
112  * <p>
113  * The short answer for our application is that we started with 10 pre-split
114  * regions / server and watched our data growth over time. It's better to err on
115  * the side of too little regions and rolling split later.
116  * <p>
117  * The more complicated answer is that this depends upon the largest storefile
118  * in your region. With a growing data size, this will get larger over time. You
119  * want the largest region to be just big enough that the {@link Store} compact
120  * selection algorithm only compacts it due to a timed major. If you don't, your
121  * cluster can be prone to compaction storms as the algorithm decides to run
122  * major compactions on a large series of regions all at once. Note that
123  * compaction storms are due to the uniform data growth, not the manual split
124  * decision.
125  * <p>
126  * If you pre-split your regions too thin, you can increase the major compaction
127  * interval by configuring HConstants.MAJOR_COMPACTION_PERIOD. If your data size
128  * grows too large, use this script to perform a network IO safe rolling split
129  * of all regions.
130  */
131 public class RegionSplitter {
132   static final Log LOG = LogFactory.getLog(RegionSplitter.class);
133 
134   /**
135    * A generic interface for the RegionSplitter code to use for all it's
136    * functionality. Note that the original authors of this code use
137    * {@link MD5StringSplit} to partition their table and set it as default, but
138    * provided this for your custom algorithm. To use, create a new derived class
139    * from this interface and call the RegionSplitter class with the argument: <br>
140    * <b>-D split.algorithm=<your_class_path></b>
141    */
142   public static interface SplitAlgorithm {
143     /**
144      * Split a pre-existing region into 2 regions.
145      *
146      * @param start
147      *          row
148      * @param end
149      *          row
150      * @return the split row to use
151      */
152     byte[] split(byte[] start, byte[] end);
153 
154     /**
155      * Split an entire table.
156      *
157      * @param numberOfSplits
158      *          number of regions to split the table into
159      *
160      * @return array of split keys for the initial regions of the table
161      */
162     byte[][] split(int numberOfSplits);
163 
164     /**
165      * In HBase, the first row is represented by an empty byte array. This might
166      * cause problems with your split algorithm or row printing. All your APIs
167      * will be passed firstRow() instead of empty array.
168      *
169      * @return your representation of your first row
170      */
171     byte[] firstRow();
172 
173     /**
174      * In HBase, the last row is represented by an empty byte array. This might
175      * cause problems with your split algorithm or row printing. All your APIs
176      * will be passed firstRow() instead of empty array.
177      *
178      * @return your representation of your last row
179      */
180     byte[] lastRow();
181 
182     /**
183      * @param input
184      *          user or file input for row
185      * @return byte array representation of this row for HBase
186      */
187     byte[] strToRow(String input);
188 
189     /**
190      * @param row
191      *          byte array representing a row in HBase
192      * @return String to use for debug & file printing
193      */
194     String rowToStr(byte[] row);
195 
196     /**
197      * @return the separator character to use when storing / printing the row
198      */
199     String separator();
200   }
201 
202   /**
203    * The main function for the RegionSplitter application. Common uses:
204    * <p>
205    * <ul>
206    * <li>create a table named 'myTable' with 60 pre-split regions containing 2
207    * column families 'test' & 'rs' bin/hbase
208    * <ul>
209    * <li>org.apache.hadoop.hbase.util.RegionSplitter -c 60 -f test:rs myTable
210    * </ul>
211    * <li>perform a rolling split of 'myTable' (i.e. 60 => 120 regions), # 2
212    * outstanding splits at a time bin/hbase
213    * <ul>
214    * <li>org.apache.hadoop.hbase.util.RegionSplitter -r -o 2 myTable
215    * </ul>
216    * </ul>
217    *
218    * @param args
219    *          Usage: RegionSplitter &lt;TABLE&gt; &lt;-c &lt;# regions&gt; -f
220    *          &lt;family:family:...&gt; | -r [-o &lt;# outstanding
221    *          splits&gt;]&gt; [-D &lt;conf.param=value&gt;]
222    * @throws IOException
223    *           HBase IO problem
224    * @throws InterruptedException
225    *           user requested exit
226    * @throws ParseException
227    *           problem parsing user input
228    */
229   @SuppressWarnings("static-access")
230   public static void main(String[] args) throws IOException,
231       InterruptedException, ParseException {
232     Configuration conf = HBaseConfiguration.create();
233 
234     // parse user input
235     Options opt = new Options();
236     opt.addOption(OptionBuilder.withArgName("property=value").hasArg()
237         .withDescription("Override HBase Configuration Settings").create("D"));
238     opt.addOption(OptionBuilder.withArgName("region count").hasArg()
239         .withDescription(
240             "Create a new table with a pre-split number of regions")
241         .create("c"));
242     opt.addOption(OptionBuilder.withArgName("family:family:...").hasArg()
243         .withDescription(
244             "Column Families to create with new table.  Required with -c")
245         .create("f"));
246     opt.addOption("h", false, "Print this usage help");
247     opt.addOption("r", false, "Perform a rolling split of an existing region");
248     opt.addOption(OptionBuilder.withArgName("count").hasArg().withDescription(
249         "Max outstanding splits that have unfinished major compactions")
250         .create("o"));
251     opt.addOption(null, "risky", false,
252         "Skip verification steps to complete quickly."
253             + "STRONGLY DISCOURAGED for production systems.  ");
254     CommandLine cmd = new GnuParser().parse(opt, args);
255 
256     if (cmd.hasOption("D")) {
257       for (String confOpt : cmd.getOptionValues("D")) {
258         String[] kv = confOpt.split("=", 2);
259         if (kv.length == 2) {
260           conf.set(kv[0], kv[1]);
261           LOG.debug("-D configuration override: " + kv[0] + "=" + kv[1]);
262         } else {
263           throw new ParseException("-D option format invalid: " + confOpt);
264         }
265       }
266     }
267 
268     if (cmd.hasOption("risky")) {
269       conf.setBoolean("split.verify", false);
270     }
271 
272     boolean createTable = cmd.hasOption("c") && cmd.hasOption("f");
273     boolean rollingSplit = cmd.hasOption("r");
274     boolean oneOperOnly = createTable ^ rollingSplit;
275 
276     if (1 != cmd.getArgList().size() || !oneOperOnly || cmd.hasOption("h")) {
277       new HelpFormatter().printHelp("RegionSplitter <TABLE>", opt);
278       return;
279     }
280     String tableName = cmd.getArgs()[0];
281 
282     if (createTable) {
283       conf.set("split.count", cmd.getOptionValue("c"));
284       createPresplitTable(tableName, cmd.getOptionValue("f").split(":"), conf);
285     }
286 
287     if (rollingSplit) {
288       if (cmd.hasOption("o")) {
289         conf.set("split.outstanding", cmd.getOptionValue("o"));
290       }
291       rollingSplit(tableName, conf);
292     }
293   }
294 
295   static void createPresplitTable(String tableName, String[] columnFamilies,
296       Configuration conf) throws IOException, InterruptedException {
297     Class<? extends SplitAlgorithm> splitClass = conf.getClass(
298         "split.algorithm", MD5StringSplit.class, SplitAlgorithm.class);
299     SplitAlgorithm splitAlgo;
300     try {
301       splitAlgo = splitClass.newInstance();
302     } catch (Exception e) {
303       throw new IOException("Problem loading split algorithm: ", e);
304     }
305     final int splitCount = conf.getInt("split.count", 0);
306     Preconditions.checkArgument(splitCount > 1, "Split count must be > 1");
307 
308     Preconditions.checkArgument(columnFamilies.length > 0,
309         "Must specify at least one column family. ");
310     LOG.debug("Creating table " + tableName + " with " + columnFamilies.length
311         + " column families.  Presplitting to " + splitCount + " regions");
312 
313     HTableDescriptor desc = new HTableDescriptor(tableName);
314     for (String cf : columnFamilies) {
315       desc.addFamily(new HColumnDescriptor(Bytes.toBytes(cf)));
316     }
317     HBaseAdmin admin = new HBaseAdmin(conf);
318     Preconditions.checkArgument(!admin.tableExists(tableName),
319         "Table already exists: " + tableName);
320     admin.createTable(desc, splitAlgo.split(splitCount));
321     LOG.debug("Table created!  Waiting for regions to show online in META...");
322 
323     if (!conf.getBoolean("split.verify", true)) {
324       // NOTE: createTable is synchronous on the table, but not on the regions
325       HTable table = new HTable(tableName);
326       int onlineRegions = 0;
327       while (onlineRegions < splitCount) {
328         onlineRegions = table.getRegionsInfo().size();
329         LOG.debug(onlineRegions + " of " + splitCount + " regions online...");
330         if (onlineRegions < splitCount) {
331           Thread.sleep(10 * 1000); // sleep
332         }
333       }
334     }
335 
336     LOG.debug("Finished creating table with " + splitCount + " regions");
337   }
338 
339   static void rollingSplit(String tableName, Configuration conf)
340       throws IOException, InterruptedException {
341     Class<? extends SplitAlgorithm> splitClass = conf.getClass(
342         "split.algorithm", MD5StringSplit.class, SplitAlgorithm.class);
343     SplitAlgorithm splitAlgo;
344     try {
345       splitAlgo = splitClass.newInstance();
346     } catch (Exception e) {
347       throw new IOException("Problem loading split algorithm: ", e);
348     }
349     final int minOS = conf.getInt("split.outstanding", 2);
350 
351     HTable table = new HTable(conf, tableName);
352 
353     // max outstanding splits. default == 50% of servers
354     final int MAX_OUTSTANDING = Math.max(table.getCurrentNrHRS() / 2, minOS);
355 
356     Path hbDir = new Path(conf.get(HConstants.HBASE_DIR));
357     Path tableDir = HTableDescriptor.getTableDir(hbDir, table.getTableName());
358     Path splitFile = new Path(tableDir, "_balancedSplit");
359     FileSystem fs = FileSystem.get(conf);
360 
361     // get a list of daughter regions to create
362     LinkedList<Pair<byte[], byte[]>> tmpRegionSet = getSplits(table, splitAlgo);
363     LinkedList<Pair<byte[], byte[]>> outstanding = Lists.newLinkedList();
364     int splitCount = 0;
365     final int origCount = tmpRegionSet.size();
366 
367     // all splits must compact & we have 1 compact thread, so 2 split
368     // requests to the same RS can stall the outstanding split queue.
369     // To fix, group the regions into an RS pool and round-robin through it
370     LOG.debug("Bucketing regions by regionserver...");
371     TreeMap<HServerAddress, LinkedList<Pair<byte[], byte[]>>> daughterRegions = Maps
372         .newTreeMap();
373     for (Pair<byte[], byte[]> dr : tmpRegionSet) {
374       HServerAddress rsLocation = table.getRegionLocation(dr.getSecond())
375           .getServerAddress();
376       if (!daughterRegions.containsKey(rsLocation)) {
377         LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
378         daughterRegions.put(rsLocation, entry);
379       }
380       daughterRegions.get(rsLocation).add(dr);
381     }
382     LOG.debug("Done with bucketing.  Split time!");
383     long startTime = System.currentTimeMillis();
384 
385     // open the split file and modify it as splits finish
386     FSDataInputStream tmpIn = fs.open(splitFile);
387     byte[] rawData = new byte[tmpIn.available()];
388     tmpIn.readFully(rawData);
389     tmpIn.close();
390     FSDataOutputStream splitOut = fs.create(splitFile);
391     splitOut.write(rawData);
392 
393     try {
394       // *** split code ***
395       while (!daughterRegions.isEmpty()) {
396         LOG.debug(daughterRegions.size() + " RS have regions to splt.");
397 
398         // round-robin through the RS list
399         for (HServerAddress rsLoc = daughterRegions.firstKey();
400              rsLoc != null;
401              rsLoc = daughterRegions.higherKey(rsLoc)) {
402           Pair<byte[], byte[]> dr = null;
403 
404           // find a region in the RS list that hasn't been moved
405           LOG.debug("Finding a region on " + rsLoc);
406           LinkedList<Pair<byte[], byte[]>> regionList = daughterRegions
407               .get(rsLoc);
408           while (!regionList.isEmpty()) {
409             dr = regionList.pop();
410 
411             // get current region info
412             byte[] split = dr.getSecond();
413             HRegionLocation regionLoc = table.getRegionLocation(split);
414 
415             // if this region moved locations
416             HServerAddress newRs = regionLoc.getServerAddress();
417             if (newRs.compareTo(rsLoc) != 0) {
418               LOG.debug("Region with " + splitAlgo.rowToStr(split)
419                   + " moved to " + newRs + ". Relocating...");
420               // relocate it, don't use it right now
421               if (!daughterRegions.containsKey(newRs)) {
422                 LinkedList<Pair<byte[], byte[]>> entry = Lists.newLinkedList();
423                 daughterRegions.put(newRs, entry);
424               }
425               daughterRegions.get(newRs).add(dr);
426               dr = null;
427               continue;
428             }
429 
430             // make sure this region wasn't already split
431             byte[] sk = regionLoc.getRegionInfo().getStartKey();
432             if (sk.length != 0) {
433               if (Bytes.equals(split, sk)) {
434                 LOG.debug("Region already split on "
435                     + splitAlgo.rowToStr(split) + ".  Skipping this region...");
436                 ++splitCount;
437                 dr = null;
438                 continue;
439               }
440               byte[] start = dr.getFirst();
441               Preconditions.checkArgument(Bytes.equals(start, sk), splitAlgo
442                   .rowToStr(start) + " != " + splitAlgo.rowToStr(sk));
443             }
444 
445             // passed all checks! found a good region
446             break;
447           }
448           if (regionList.isEmpty()) {
449             daughterRegions.remove(rsLoc);
450           }
451           if (dr == null)
452             continue;
453 
454           // we have a good region, time to split!
455           byte[] split = dr.getSecond();
456           LOG.debug("Splitting at " + splitAlgo.rowToStr(split));
457           HBaseAdmin admin = new HBaseAdmin(table.getConfiguration());
458           admin.split(table.getTableName(), split);
459 
460           LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
461           if (conf.getBoolean("split.verify", true)) {
462             // we need to verify and rate-limit our splits
463             outstanding.addLast(dr);
464             // with too many outstanding splits, wait for some to finish
465             while (outstanding.size() >= MAX_OUTSTANDING) {
466               finished = splitScan(outstanding, table, splitAlgo);
467               if (finished.isEmpty()) {
468                 Thread.sleep(30 * 1000);
469               } else {
470                 outstanding.removeAll(finished);
471               }
472             }
473           } else {
474             finished.add(dr);
475           }
476 
477           // mark each finished region as successfully split.
478           for (Pair<byte[], byte[]> region : finished) {
479             splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
480                 + " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
481             splitCount++;
482             if (splitCount % 10 == 0) {
483               long tDiff = (System.currentTimeMillis() - startTime)
484                   / splitCount;
485               LOG.debug("STATUS UPDATE: " + splitCount + " / " + origCount
486                   + ". Avg Time / Split = "
487                   + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
488             }
489           }
490         }
491       }
492       if (conf.getBoolean("split.verify", true)) {
493         while (!outstanding.isEmpty()) {
494           LinkedList<Pair<byte[], byte[]>> finished = splitScan(outstanding,
495               table, splitAlgo);
496           if (finished.isEmpty()) {
497             Thread.sleep(30 * 1000);
498           } else {
499             outstanding.removeAll(finished);
500             for (Pair<byte[], byte[]> region : finished) {
501               splitOut.writeChars("- " + splitAlgo.rowToStr(region.getFirst())
502                   + " " + splitAlgo.rowToStr(region.getSecond()) + "\n");
503             }
504           }
505         }
506       }
507       LOG.debug("All regions have been sucesfully split!");
508     } finally {
509       long tDiff = System.currentTimeMillis() - startTime;
510       LOG.debug("TOTAL TIME = "
511           + org.apache.hadoop.util.StringUtils.formatTime(tDiff));
512       LOG.debug("Splits = " + splitCount);
513       LOG.debug("Avg Time / Split = "
514           + org.apache.hadoop.util.StringUtils.formatTime(tDiff / splitCount));
515 
516       splitOut.close();
517     }
518     fs.delete(splitFile, false);
519   }
520 
521   static LinkedList<Pair<byte[], byte[]>> splitScan(
522       LinkedList<Pair<byte[], byte[]>> regionList, HTable table,
523       SplitAlgorithm splitAlgo)
524       throws IOException, InterruptedException {
525     LinkedList<Pair<byte[], byte[]>> finished = Lists.newLinkedList();
526     LinkedList<Pair<byte[], byte[]>> logicalSplitting = Lists.newLinkedList();
527     LinkedList<Pair<byte[], byte[]>> physicalSplitting = Lists.newLinkedList();
528 
529     // get table info
530     Path hbDir = new Path(table.getConfiguration().get(HConstants.HBASE_DIR));
531     Path tableDir = HTableDescriptor.getTableDir(hbDir, table.getTableName());
532     Path splitFile = new Path(tableDir, "_balancedSplit");
533     FileSystem fs = FileSystem.get(table.getConfiguration());
534 
535     // clear the cache to forcibly refresh region information
536     table.clearRegionCache();
537 
538     // for every region that hasn't been verified as a finished split
539     for (Pair<byte[], byte[]> region : regionList) {
540       byte[] start = region.getFirst();
541       byte[] split = region.getSecond();
542 
543       // see if the new split daughter region has come online
544       HRegionInfo dri = table.getRegionLocation(split).getRegionInfo();
545       if (dri.isOffline() || !Bytes.equals(dri.getStartKey(), split)) {
546         logicalSplitting.add(region);
547         continue;
548       }
549 
550       try {
551         // when a daughter region is opened, a compaction is triggered
552         // wait until compaction completes for both daughter regions
553         LinkedList<HRegionInfo> check = Lists.newLinkedList();
554         check.add(table.getRegionLocation(start).getRegionInfo());
555         check.add(table.getRegionLocation(split).getRegionInfo());
556         for (HRegionInfo hri : check.toArray(new HRegionInfo[] {})) {
557           boolean refFound = false;
558           byte[] sk = hri.getStartKey();
559           if (sk.length == 0)
560             sk = splitAlgo.firstRow();
561           String startKey = splitAlgo.rowToStr(sk);
562           // check every Column Family for that region
563           for (HColumnDescriptor c : hri.getTableDesc().getFamilies()) {
564             Path cfDir = Store.getStoreHomedir(tableDir, hri.getEncodedName(),
565                 c.getName());
566             if (fs.exists(cfDir)) {
567               for (FileStatus file : fs.listStatus(cfDir)) {
568                 refFound |= StoreFile.isReference(file.getPath());
569                 if (refFound)
570                   break;
571               }
572             }
573             if (refFound)
574               break;
575           }
576           // compaction is completed when all reference files are gone
577           if (!refFound) {
578             check.remove(hri);
579           }
580         }
581         if (check.isEmpty()) {
582           finished.add(region);
583         } else {
584           physicalSplitting.add(region);
585         }
586       } catch (NoServerForRegionException nsfre) {
587         LOG.debug("No Server Exception thrown for: "
588             + splitAlgo.rowToStr(start));
589         physicalSplitting.add(region);
590         table.clearRegionCache();
591       }
592     }
593 
594     LOG.debug("Split Scan: " + finished.size() + " finished / "
595         + logicalSplitting.size() + " split wait / "
596         + physicalSplitting.size() + " reference wait");
597 
598     return finished;
599   }
600 
601   static LinkedList<Pair<byte[], byte[]>> getSplits(HTable table,
602       SplitAlgorithm splitAlgo) throws IOException {
603     Path hbDir = new Path(table.getConfiguration().get(HConstants.HBASE_DIR));
604     Path tableDir = HTableDescriptor.getTableDir(hbDir, table.getTableName());
605     Path splitFile = new Path(tableDir, "_balancedSplit");
606     FileSystem fs = FileSystem.get(table.getConfiguration());
607 
608     // using strings because (new byte[]{0}).equals(new byte[]{0}) == false
609     Set<Pair<String, String>> daughterRegions = Sets.newHashSet();
610 
611     // does a split file exist?
612     if (!fs.exists(splitFile)) {
613       // NO = fresh start. calculate splits to make
614       LOG.debug("No _balancedSplit file.  Calculating splits...");
615 
616       // query meta for all regions in the table
617       Set<Pair<byte[], byte[]>> rows = Sets.newHashSet();
618       Pair<byte[][], byte[][]> tmp = table.getStartEndKeys();
619       Preconditions.checkArgument(
620           tmp.getFirst().length == tmp.getSecond().length,
621           "Start and End rows should be equivalent");
622       for (int i = 0; i < tmp.getFirst().length; ++i) {
623         byte[] start = tmp.getFirst()[i], end = tmp.getSecond()[i];
624         if (start.length == 0)
625           start = splitAlgo.firstRow();
626         if (end.length == 0)
627           end = splitAlgo.lastRow();
628         rows.add(Pair.newPair(start, end));
629       }
630       LOG.debug("Table " + Bytes.toString(table.getTableName()) + " has "
631           + rows.size() + " regions that will be split.");
632 
633       // prepare the split file
634       Path tmpFile = new Path(tableDir, "_balancedSplit_prepare");
635       FSDataOutputStream tmpOut = fs.create(tmpFile);
636 
637       // calculate all the splits == [daughterRegions] = [(start, splitPoint)]
638       for (Pair<byte[], byte[]> r : rows) {
639         byte[] splitPoint = splitAlgo.split(r.getFirst(), r.getSecond());
640         String startStr = splitAlgo.rowToStr(r.getFirst());
641         String splitStr = splitAlgo.rowToStr(splitPoint);
642         daughterRegions.add(Pair.newPair(startStr, splitStr));
643         LOG.debug("Will Split [" + startStr + " , "
644             + splitAlgo.rowToStr(r.getSecond()) + ") at " + splitStr);
645         tmpOut.writeChars("+ " + startStr + splitAlgo.separator() + splitStr
646             + "\n");
647       }
648       tmpOut.close();
649       fs.rename(tmpFile, splitFile);
650     } else {
651       LOG.debug("_balancedSplit file found. Replay log to restore state...");
652       FSUtils.recoverFileLease(fs, splitFile, table.getConfiguration());
653 
654       // parse split file and process remaining splits
655       FSDataInputStream tmpIn = fs.open(splitFile);
656       StringBuilder sb = new StringBuilder(tmpIn.available());
657       while (tmpIn.available() > 0) {
658         sb.append(tmpIn.readChar());
659       }
660       tmpIn.close();
661       for (String line : sb.toString().split("\n")) {
662         String[] cmd = line.split(splitAlgo.separator());
663         Preconditions.checkArgument(3 == cmd.length);
664         byte[] start = splitAlgo.strToRow(cmd[1]);
665         String startStr = splitAlgo.rowToStr(start);
666         byte[] splitPoint = splitAlgo.strToRow(cmd[2]);
667         String splitStr = splitAlgo.rowToStr(splitPoint);
668         Pair<String, String> r = Pair.newPair(startStr, splitStr);
669         if (cmd[0].equals("+")) {
670           LOG.debug("Adding: " + r);
671           daughterRegions.add(r);
672         } else {
673           LOG.debug("Removing: " + r);
674           Preconditions.checkArgument(cmd[0].equals("-"),
675               "Unknown option: " + cmd[0]);
676           Preconditions.checkState(daughterRegions.contains(r),
677               "Missing row: " + r);
678           daughterRegions.remove(r);
679         }
680       }
681       LOG.debug("Done reading. " + daughterRegions.size() + " regions left.");
682     }
683     LinkedList<Pair<byte[], byte[]>> ret = Lists.newLinkedList();
684     for (Pair<String, String> r : daughterRegions) {
685       ret.add(Pair.newPair(splitAlgo.strToRow(r.getFirst()), splitAlgo
686           .strToRow(r.getSecond())));
687     }
688     return ret;
689   }
690 
691   /**
692    * MD5StringSplit is the default {@link SplitAlgorithm} for creating pre-split
693    * tables. The format of MD5StringSplit is the ASCII representation of an MD5
694    * checksum. Row are long values in the range <b>"00000000" => "7FFFFFFF"</b>
695    * and are left-padded with zeros to keep the same order lexographically as if
696    * they were binary.
697    */
698   public static class MD5StringSplit implements SplitAlgorithm {
699     final static String MAXMD5 = "7FFFFFFF";
700     final static BigInteger MAXMD5_INT = new BigInteger(MAXMD5, 16);
701     final static int rowComparisonLength = MAXMD5.length();
702 
703     public byte[] split(byte[] start, byte[] end) {
704       BigInteger s = convertToBigInteger(start);
705       BigInteger e = convertToBigInteger(end);
706       Preconditions.checkArgument(!e.equals(BigInteger.ZERO));
707       return convertToByte(split2(s, e));
708     }
709 
710     public byte[][] split(int n) {
711       BigInteger[] splits = new BigInteger[n - 1];
712       BigInteger sizeOfEachSplit = MAXMD5_INT.divide(BigInteger.valueOf(n));
713       for (int i = 1; i < n; i++) {
714         // NOTE: this means the last region gets all the slop.
715         // This is not a big deal if we're assuming n << MAXMD5
716         splits[i - 1] = sizeOfEachSplit.multiply(BigInteger.valueOf(i));
717       }
718       return convertToBytes(splits);
719     }
720 
721     public byte[] firstRow() {
722       return convertToByte(BigInteger.ZERO);
723     }
724 
725     public byte[] lastRow() {
726       return convertToByte(MAXMD5_INT);
727     }
728 
729     public byte[] strToRow(String in) {
730       return convertToByte(new BigInteger(in, 16));
731     }
732 
733     public String rowToStr(byte[] row) {
734       return Bytes.toStringBinary(row);
735     }
736 
737     public String separator() {
738       return " ";
739     }
740 
741     static BigInteger split2(BigInteger minValue, BigInteger maxValue) {
742       return maxValue.add(minValue).divide(BigInteger.valueOf(2));
743     }
744 
745     /**
746      * Returns an array of bytes corresponding to an array of BigIntegers
747      *
748      * @param bigIntegers
749      * @return bytes corresponding to the bigIntegers
750      */
751     static byte[][] convertToBytes(BigInteger[] bigIntegers) {
752       byte[][] returnBytes = new byte[bigIntegers.length][];
753       for (int i = 0; i < bigIntegers.length; i++) {
754         returnBytes[i] = convertToByte(bigIntegers[i]);
755       }
756       return returnBytes;
757     }
758 
759     /**
760      * Returns the bytes corresponding to the BigInteger
761      *
762      * @param bigInteger
763      * @return byte corresponding to input BigInteger
764      */
765     static byte[] convertToByte(BigInteger bigInteger) {
766       String bigIntegerString = bigInteger.toString(16);
767       bigIntegerString = StringUtils.leftPad(bigIntegerString,
768           rowComparisonLength, '0');
769       return Bytes.toBytes(bigIntegerString);
770     }
771 
772     /**
773      * Returns the BigInteger represented by thebyte array
774      *
775      * @param row
776      * @return the corresponding BigInteger
777      */
778     static BigInteger convertToBigInteger(byte[] row) {
779       return (row.length > 0) ? new BigInteger(Bytes.toString(row), 16)
780           : BigInteger.ZERO;
781     }
782   }
783 
784 }