1   /**
2    * Copyright 2008 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.util;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.fs.Path;
30  import org.apache.hadoop.hbase.HBaseTestCase;
31  import org.apache.hadoop.hbase.HColumnDescriptor;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.HTableDescriptor;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.client.Get;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.hbase.client.Result;
39  import org.apache.hadoop.hbase.client.Scan;
40  import org.apache.hadoop.hbase.regionserver.wal.HLog;
41  import org.apache.hadoop.hbase.regionserver.HRegion;
42  import org.apache.hadoop.hbase.regionserver.InternalScanner;
43  import org.apache.hadoop.hdfs.MiniDFSCluster;
44  import org.apache.hadoop.util.ToolRunner;
45  
46  /** Test stand alone merge tool that can merge arbitrary regions */
47  public class TestMergeTool extends HBaseTestCase {
48    static final Log LOG = LogFactory.getLog(TestMergeTool.class);
49  //  static final byte [] COLUMN_NAME = Bytes.toBytes("contents:");
50    static final byte [] FAMILY = Bytes.toBytes("contents");
51    static final byte [] QUALIFIER = Bytes.toBytes("dc");
52  
53    private final HRegionInfo[] sourceRegions = new HRegionInfo[5];
54    private final HRegion[] regions = new HRegion[5];
55    private HTableDescriptor desc;
56    private byte [][][] rows;
57    private MiniDFSCluster dfsCluster = null;
58  
59    @Override
60    public void setUp() throws Exception {
61      // Set the timeout down else this test will take a while to complete.
62      this.conf.setLong("hbase.zookeeper.recoverable.waittime", 1000);
63  
64      this.conf.set("hbase.hstore.compactionThreshold", "2");
65  
66      // Create table description
67      this.desc = new HTableDescriptor("TestMergeTool");
68      this.desc.addFamily(new HColumnDescriptor(FAMILY));
69  
70      /*
71       * Create the HRegionInfos for the regions.
72       */
73      // Region 0 will contain the key range [row_0200,row_0300)
74      sourceRegions[0] = new HRegionInfo(this.desc, Bytes.toBytes("row_0200"),
75        Bytes.toBytes("row_0300"));
76  
77      // Region 1 will contain the key range [row_0250,row_0400) and overlaps
78      // with Region 0
79      sourceRegions[1] =
80        new HRegionInfo(this.desc, Bytes.toBytes("row_0250"),
81            Bytes.toBytes("row_0400"));
82  
83      // Region 2 will contain the key range [row_0100,row_0200) and is adjacent
84      // to Region 0 or the region resulting from the merge of Regions 0 and 1
85      sourceRegions[2] =
86        new HRegionInfo(this.desc, Bytes.toBytes("row_0100"),
87            Bytes.toBytes("row_0200"));
88  
89      // Region 3 will contain the key range [row_0500,row_0600) and is not
90      // adjacent to any of Regions 0, 1, 2 or the merged result of any or all
91      // of those regions
92      sourceRegions[3] =
93        new HRegionInfo(this.desc, Bytes.toBytes("row_0500"),
94            Bytes.toBytes("row_0600"));
95  
96      // Region 4 will have empty start and end keys and overlaps all regions.
97      sourceRegions[4] =
98        new HRegionInfo(this.desc, HConstants.EMPTY_BYTE_ARRAY,
99            HConstants.EMPTY_BYTE_ARRAY);
100 
101     /*
102      * Now create some row keys
103      */
104     this.rows = new byte [5][][];
105     this.rows[0] = Bytes.toByteArrays(new String[] { "row_0210", "row_0280" });
106     this.rows[1] = Bytes.toByteArrays(new String[] { "row_0260", "row_0350",
107         "row_035" });
108     this.rows[2] = Bytes.toByteArrays(new String[] { "row_0110", "row_0175",
109         "row_0175", "row_0175"});
110     this.rows[3] = Bytes.toByteArrays(new String[] { "row_0525", "row_0560",
111         "row_0560", "row_0560", "row_0560"});
112     this.rows[4] = Bytes.toByteArrays(new String[] { "row_0050", "row_1000",
113         "row_1000", "row_1000", "row_1000", "row_1000" });
114 
115     // Start up dfs
116     this.dfsCluster = new MiniDFSCluster(conf, 2, true, (String[])null);
117     this.fs = this.dfsCluster.getFileSystem();
118     System.out.println("fs=" + this.fs);
119     this.conf.set("fs.defaultFS", fs.getUri().toString());
120     Path parentdir = fs.getHomeDirectory();
121     conf.set(HConstants.HBASE_DIR, parentdir.toString());
122     fs.mkdirs(parentdir);
123     FSUtils.setVersion(fs, parentdir);
124 
125     // Note: we must call super.setUp after starting the mini cluster or
126     // we will end up with a local file system
127 
128     super.setUp();
129     try {
130       // Create root and meta regions
131       createRootAndMetaRegions();
132       /*
133        * Create the regions we will merge
134        */
135       for (int i = 0; i < sourceRegions.length; i++) {
136         regions[i] =
137           HRegion.createHRegion(this.sourceRegions[i], this.testDir, this.conf);
138         /*
139          * Insert data
140          */
141         for (int j = 0; j < rows[i].length; j++) {
142           byte [] row = rows[i][j];
143           Put put = new Put(row);
144           put.add(FAMILY, QUALIFIER, row);
145           regions[i].put(put);
146         }
147         HRegion.addRegionToMETA(meta, regions[i]);
148       }
149       // Close root and meta regions
150       closeRootAndMeta();
151 
152     } catch (Exception e) {
153       shutdownDfs(dfsCluster);
154       throw e;
155     }
156   }
157 
158   @Override
159   public void tearDown() throws Exception {
160     super.tearDown();
161     shutdownDfs(dfsCluster);
162   }
163 
164   /*
165    * @param msg Message that describes this merge
166    * @param regionName1
167    * @param regionName2
168    * @param log Log to use merging.
169    * @param upperbound Verifying, how high up in this.rows to go.
170    * @return Merged region.
171    * @throws Exception
172    */
173   private HRegion mergeAndVerify(final String msg, final String regionName1,
174     final String regionName2, final HLog log, final int upperbound)
175   throws Exception {
176     Merge merger = new Merge(this.conf);
177     LOG.info(msg);
178     System.out.println("fs2=" + this.conf.get("fs.defaultFS"));
179     int errCode = ToolRunner.run(this.conf, merger,
180       new String[] {this.desc.getNameAsString(), regionName1, regionName2}
181     );
182     assertTrue("'" + msg + "' failed", errCode == 0);
183     HRegionInfo mergedInfo = merger.getMergedHRegionInfo();
184 
185     // Now verify that we can read all the rows from regions 0, 1
186     // in the new merged region.
187     HRegion merged = HRegion.openHRegion(mergedInfo, log, this.conf);
188     verifyMerge(merged, upperbound);
189     merged.close();
190     LOG.info("Verified " + msg);
191     return merged;
192   }
193 
194   private void verifyMerge(final HRegion merged, final int upperbound)
195   throws IOException {
196     //Test
197     Scan scan = new Scan();
198     scan.addFamily(FAMILY);
199     InternalScanner scanner = merged.getScanner(scan);
200     try {
201     List<KeyValue> testRes = null;
202       while (true) {
203         testRes = new ArrayList<KeyValue>();
204         boolean hasNext = scanner.next(testRes);
205         if (!hasNext) {
206           break;
207         }
208       }
209     } finally {
210       scanner.close();
211     }
212 
213     //!Test
214 
215     for (int i = 0; i < upperbound; i++) {
216       for (int j = 0; j < rows[i].length; j++) {
217         Get get = new Get(rows[i][j]);
218         get.addFamily(FAMILY);
219         Result result = merged.get(get, null);
220         assertEquals(1, result.size());
221         byte [] bytes = result.sorted()[0].getValue();
222         assertNotNull(Bytes.toStringBinary(rows[i][j]), bytes);
223         assertTrue(Bytes.equals(bytes, rows[i][j]));
224       }
225     }
226   }
227 
228   /**
229    * Test merge tool.
230    * @throws Exception
231    */
232   public void testMergeTool() throws Exception {
233     // First verify we can read the rows from the source regions and that they
234     // contain the right data.
235     for (int i = 0; i < regions.length; i++) {
236       for (int j = 0; j < rows[i].length; j++) {
237         Get get = new Get(rows[i][j]);
238         get.addFamily(FAMILY);
239         Result result = regions[i].get(get, null);
240         byte [] bytes = result.sorted()[0].getValue();
241         assertNotNull(bytes);
242         assertTrue(Bytes.equals(bytes, rows[i][j]));
243       }
244       // Close the region and delete the log
245       regions[i].close();
246       regions[i].getLog().closeAndDelete();
247     }
248 
249     // Create a log that we can reuse when we need to open regions
250     Path logPath = new Path("/tmp", HConstants.HREGION_LOGDIR_NAME + "_" +
251       System.currentTimeMillis());
252     LOG.info("Creating log " + logPath.toString());
253     Path oldLogDir = new Path("/tmp", HConstants.HREGION_OLDLOGDIR_NAME);
254     HLog log = new HLog(this.fs, logPath, oldLogDir, this.conf);
255     try {
256        // Merge Region 0 and Region 1
257       HRegion merged = mergeAndVerify("merging regions 0 and 1",
258         this.sourceRegions[0].getRegionNameAsString(),
259         this.sourceRegions[1].getRegionNameAsString(), log, 2);
260 
261       // Merge the result of merging regions 0 and 1 with region 2
262       merged = mergeAndVerify("merging regions 0+1 and 2",
263         merged.getRegionInfo().getRegionNameAsString(),
264         this.sourceRegions[2].getRegionNameAsString(), log, 3);
265 
266       // Merge the result of merging regions 0, 1 and 2 with region 3
267       merged = mergeAndVerify("merging regions 0+1+2 and 3",
268         merged.getRegionInfo().getRegionNameAsString(),
269         this.sourceRegions[3].getRegionNameAsString(), log, 4);
270 
271       // Merge the result of merging regions 0, 1, 2 and 3 with region 4
272       merged = mergeAndVerify("merging regions 0+1+2+3 and 4",
273         merged.getRegionInfo().getRegionNameAsString(),
274         this.sourceRegions[4].getRegionNameAsString(), log, rows.length);
275     } finally {
276       log.closeAndDelete();
277     }
278   }
279 }