1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import static org.junit.Assert.assertEquals;
23  
24  import java.io.IOException;
25  
26  import org.apache.hadoop.conf.Configuration;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.HBaseTestingUtility;
30  import org.apache.hadoop.hbase.HColumnDescriptor;
31  import org.apache.hadoop.hbase.HTableDescriptor;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.client.HBaseAdmin;
34  import org.apache.hadoop.hbase.client.HTable;
35  import org.apache.hadoop.hbase.io.hfile.Compression;
36  import org.apache.hadoop.hbase.io.hfile.HFile;
37  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.junit.Test;
40  
41  import static org.junit.Assert.*;
42  
43  /**
44   * Test cases for the "load" half of the HFileOutputFormat bulk load
45   * functionality. These tests run faster than the full MR cluster
46   * tests in TestHFileOutputFormat
47   */
48  public class TestLoadIncrementalHFiles {
49  
50    private static final byte[] TABLE = Bytes.toBytes("mytable");
51    private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
52    private static final byte[] FAMILY = Bytes.toBytes("myfam");
53  
54    private static final byte[][] SPLIT_KEYS = new byte[][] {
55      Bytes.toBytes("ddd"),
56      Bytes.toBytes("ppp")
57    };
58  
59    public static int BLOCKSIZE = 64*1024;
60    public static String COMPRESSION =
61      Compression.Algorithm.NONE.getName();
62  
63    private HBaseTestingUtility util = new HBaseTestingUtility();
64  
65    /**
66     * Test case that creates some regions and loads
67     * HFiles that fit snugly inside those regions
68     */
69    @Test
70    public void testSimpleLoad() throws Exception {
71      runTest("testSimpleLoad",
72          new byte[][][] {
73            new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
74            new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
75      });
76    }
77  
78    /**
79     * Test case that creates some regions and loads
80     * HFiles that cross the boundaries of those regions
81     */
82    @Test
83    public void testRegionCrossingLoad() throws Exception {
84      runTest("testRegionCrossingLoad",
85          new byte[][][] {
86            new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
87            new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
88      });
89    }
90  
91    private void runTest(String testName, byte[][][] hfileRanges)
92    throws Exception {
93      Path dir = HBaseTestingUtility.getTestDir(testName);
94      FileSystem fs = util.getTestFileSystem();
95      dir = dir.makeQualified(fs);
96      Path familyDir = new Path(dir, Bytes.toString(FAMILY));
97  
98      int hfileIdx = 0;
99      for (byte[][] range : hfileRanges) {
100       byte[] from = range[0];
101       byte[] to = range[1];
102       createHFile(fs, new Path(familyDir, "hfile_" + hfileIdx++),
103           FAMILY, QUALIFIER, from, to, 1000);
104     }
105     int expectedRows = hfileIdx * 1000;
106 
107 
108     util.startMiniCluster();
109     try {
110       HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
111       HTableDescriptor htd = new HTableDescriptor(TABLE);
112       htd.addFamily(new HColumnDescriptor(FAMILY));
113       admin.createTable(htd, SPLIT_KEYS);
114 
115       HTable table = new HTable(util.getConfiguration(), TABLE);
116       util.waitTableAvailable(TABLE, 30000);
117       LoadIncrementalHFiles loader = new LoadIncrementalHFiles(
118           util.getConfiguration());
119       loader.doBulkLoad(dir, table);
120 
121       assertEquals(expectedRows, util.countRows(table));
122     } finally {
123       util.shutdownMiniCluster();
124     }
125   }
126 
127   @Test
128   public void testSplitStoreFile() throws IOException {
129     Path dir = HBaseTestingUtility.getTestDir("testSplitHFile");
130     FileSystem fs = util.getTestFileSystem();
131     Path testIn = new Path(dir, "testhfile");
132     HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
133     createHFile(fs, testIn, FAMILY, QUALIFIER,
134         Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
135 
136     Path bottomOut = new Path(dir, "bottom.out");
137     Path topOut = new Path(dir, "top.out");
138 
139     LoadIncrementalHFiles.splitStoreFile(
140         util.getConfiguration(), testIn,
141         familyDesc, Bytes.toBytes("ggg"),
142         bottomOut,
143         topOut);
144 
145     int rowCount = verifyHFile(bottomOut);
146     rowCount += verifyHFile(topOut);
147     assertEquals(1000, rowCount);
148   }
149 
150   private int verifyHFile(Path p) throws IOException {
151     Configuration conf = util.getConfiguration();
152     HFile.Reader reader = new HFile.Reader(
153         p.getFileSystem(conf), p, null, false);
154     reader.loadFileInfo();
155     HFileScanner scanner = reader.getScanner(false, false);
156     scanner.seekTo();
157     int count = 0;
158     do {
159       count++;
160     } while (scanner.next());
161     assertTrue(count > 0);
162     return count;
163   }
164 
165 
166   /**
167    * Create an HFile with the given number of rows between a given
168    * start key and end key.
169    * TODO put me in an HFileTestUtil or something?
170    */
171   static void createHFile(
172       FileSystem fs, Path path,
173       byte[] family, byte[] qualifier,
174       byte[] startKey, byte[] endKey, int numRows) throws IOException
175   {
176     HFile.Writer writer = new HFile.Writer(fs, path, BLOCKSIZE, COMPRESSION,
177         KeyValue.KEY_COMPARATOR);
178     long now = System.currentTimeMillis();
179     try {
180       // subtract 2 since iterateOnSplits doesn't include boundary keys
181       for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) {
182         KeyValue kv = new KeyValue(key, family, qualifier, now, key);
183         writer.append(kv);
184       }
185     } finally {
186       writer.close();
187     }
188   }
189 }