View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.FileNotFoundException;
23  import java.io.IOException;
24  import java.util.Deque;
25  import java.util.LinkedList;
26  import java.util.Map;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.conf.Configuration;
31  import org.apache.hadoop.conf.Configured;
32  import org.apache.hadoop.fs.FileStatus;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.FileUtil;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.HBaseConfiguration;
37  import org.apache.hadoop.hbase.HColumnDescriptor;
38  import org.apache.hadoop.hbase.HRegionInfo;
39  import org.apache.hadoop.hbase.KeyValue;
40  import org.apache.hadoop.hbase.TableNotFoundException;
41  import org.apache.hadoop.hbase.client.HConnection;
42  import org.apache.hadoop.hbase.client.HTable;
43  import org.apache.hadoop.hbase.client.ServerCallable;
44  import org.apache.hadoop.hbase.io.HalfStoreFileReader;
45  import org.apache.hadoop.hbase.io.Reference;
46  import org.apache.hadoop.hbase.io.Reference.Range;
47  import org.apache.hadoop.hbase.io.hfile.HFile;
48  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
49  import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
50  import org.apache.hadoop.hbase.regionserver.StoreFile;
51  import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
52  import org.apache.hadoop.hbase.util.Bytes;
53  import org.apache.hadoop.util.Tool;
54  import org.apache.hadoop.util.ToolRunner;
55  
56  /**
57   * Tool to load the output of HFileOutputFormat into an existing table.
58   * @see #usage()
59   */
60  public class LoadIncrementalHFiles extends Configured implements Tool {
61  
62    static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
63  
64    public static String NAME = "completebulkload";
65  
66    public LoadIncrementalHFiles(Configuration conf) {
67      super(conf);
68    }
69  
70    public LoadIncrementalHFiles() {
71      super();
72    }
73  
74  
75    private void usage() {
76      System.err.println("usage: " + NAME +
77          " /path/to/hfileoutputformat-output " +
78          "tablename");
79    }
80  
81    /**
82     * Represents an HFile waiting to be loaded. An queue is used
83     * in this class in order to support the case where a region has
84     * split during the process of the load. When this happens,
85     * the HFile is split into two physical parts across the new
86     * region boundary, and each part is added back into the queue.
87     * The import process finishes when the queue is empty.
88     */
89    private static class LoadQueueItem {
90      final byte[] family;
91      final Path hfilePath;
92  
93      public LoadQueueItem(byte[] family, Path hfilePath) {
94        this.family = family;
95        this.hfilePath = hfilePath;
96      }
97    }
98  
99    /**
100    * Walk the given directory for all HFiles, and return a Queue
101    * containing all such files.
102    */
103   private Deque<LoadQueueItem> discoverLoadQueue(Path hfofDir)
104   throws IOException {
105     FileSystem fs = hfofDir.getFileSystem(getConf());
106 
107     if (!fs.exists(hfofDir)) {
108       throw new FileNotFoundException("HFileOutputFormat dir " +
109           hfofDir + " not found");
110     }
111 
112     FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
113     if (familyDirStatuses == null) {
114       throw new FileNotFoundException("No families found in " + hfofDir);
115     }
116 
117     Deque<LoadQueueItem> ret = new LinkedList<LoadQueueItem>();
118     for (FileStatus stat : familyDirStatuses) {
119       if (!stat.isDir()) {
120         LOG.warn("Skipping non-directory " + stat.getPath());
121         continue;
122       }
123       Path familyDir = stat.getPath();
124       // Skip _logs, etc
125       if (familyDir.getName().startsWith("_")) continue;
126       byte[] family = familyDir.getName().getBytes();
127       Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
128       for (Path hfile : hfiles) {
129         if (hfile.getName().startsWith("_")) continue;
130         ret.add(new LoadQueueItem(family, hfile));
131       }
132     }
133     return ret;
134   }
135 
136   /**
137    * Perform a bulk load of the given directory into the given
138    * pre-existing table.
139    * @param hfofDir the directory that was provided as the output path
140    * of a job using HFileOutputFormat
141    * @param table the table to load into
142    * @throws TableNotFoundException if table does not yet exist
143    */
144   public void doBulkLoad(Path hfofDir, HTable table)
145     throws TableNotFoundException, IOException
146   {
147     HConnection conn = table.getConnection();
148 
149     if (!conn.isTableAvailable(table.getTableName())) {
150       throw new TableNotFoundException("Table " +
151           Bytes.toStringBinary(table.getTableName()) +
152           "is not currently available.");
153     }
154 
155     Deque<LoadQueueItem> queue = null;
156     try {
157       queue = discoverLoadQueue(hfofDir);
158       while (!queue.isEmpty()) {
159         LoadQueueItem item = queue.remove();
160         tryLoad(item, conn, table.getTableName(), queue);
161       }
162     } finally {
163       if (queue != null && !queue.isEmpty()) {
164         StringBuilder err = new StringBuilder();
165         err.append("-------------------------------------------------\n");
166         err.append("Bulk load aborted with some files not yet loaded:\n");
167         err.append("-------------------------------------------------\n");
168         for (LoadQueueItem q : queue) {
169           err.append("  ").append(q.hfilePath).append('\n');
170         }
171         LOG.error(err);
172       }
173     }
174   }
175 
176   /**
177    * Attempt to load the given load queue item into its target region server.
178    * If the hfile boundary no longer fits into a region, physically splits
179    * the hfile such that the new bottom half will fit, and adds the two
180    * resultant hfiles back into the load queue.
181    */
182   private void tryLoad(final LoadQueueItem item,
183       HConnection conn, final byte[] table,
184       final Deque<LoadQueueItem> queue)
185   throws IOException {
186     final Path hfilePath = item.hfilePath;
187     final FileSystem fs = hfilePath.getFileSystem(getConf());
188     HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false);
189     final byte[] first, last;
190     try {
191       hfr.loadFileInfo();
192       first = hfr.getFirstRowKey();
193       last = hfr.getLastRowKey();
194     }  finally {
195       hfr.close();
196     }
197 
198     LOG.info("Trying to load hfile=" + hfilePath +
199         " first=" + Bytes.toStringBinary(first) +
200         " last="  + Bytes.toStringBinary(last));
201     if (first == null || last == null) {
202       assert first == null && last == null;
203       LOG.info("hfile " + hfilePath + " has no entries, skipping");
204       return;
205     }
206 
207     // We use a '_' prefix which is ignored when walking directory trees
208     // above.
209     final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
210 
211     conn.getRegionServerWithRetries(
212       new ServerCallable<Void>(conn, table, first) {
213         @Override
214         public Void call() throws Exception {
215           LOG.debug("Going to connect to server " + location +
216               "for row " + Bytes.toStringBinary(row));
217           HRegionInfo hri = location.getRegionInfo();
218           if (!hri.containsRange(first, last)) {
219             LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
220                 "region. Splitting...");
221 
222             HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family);
223             Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom");
224             Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top");
225             splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(),
226                 botOut, topOut);
227 
228             // Add these back at the *front* of the queue, so there's a lower
229             // chance that the region will just split again before we get there.
230             queue.addFirst(new LoadQueueItem(item.family, botOut));
231             queue.addFirst(new LoadQueueItem(item.family, topOut));
232             LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
233             return null;
234           }
235 
236           byte[] regionName = location.getRegionInfo().getRegionName();
237           server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
238           return null;
239         }
240       });
241   }
242 
243   /**
244    * Split a storefile into a top and bottom half, maintaining
245    * the metadata, recreating bloom filters, etc.
246    */
247   static void splitStoreFile(
248       Configuration conf, Path inFile,
249       HColumnDescriptor familyDesc, byte[] splitKey,
250       Path bottomOut, Path topOut) throws IOException
251   {
252     // Open reader with no block cache, and not in-memory
253     Reference topReference = new Reference(splitKey, Range.top);
254     Reference bottomReference = new Reference(splitKey, Range.bottom);
255 
256     copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
257     copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
258   }
259 
260   /**
261    * Copy half of an HFile into a new HFile.
262    */
263   private static void copyHFileHalf(
264       Configuration conf, Path inFile, Path outFile, Reference reference,
265       HColumnDescriptor familyDescriptor)
266   throws IOException {
267     FileSystem fs = inFile.getFileSystem(conf);
268     HalfStoreFileReader halfReader = null;
269     StoreFile.Writer halfWriter = null;
270     try {
271       halfReader = new HalfStoreFileReader(fs, inFile, null, reference);
272       Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
273 
274       int blocksize = familyDescriptor.getBlocksize();
275       Algorithm compression = familyDescriptor.getCompression();
276       BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
277 
278       halfWriter = new StoreFile.Writer(
279           fs, outFile, blocksize, compression, conf, KeyValue.COMPARATOR,
280           bloomFilterType, 0);
281       HFileScanner scanner = halfReader.getScanner(false, false);
282       scanner.seekTo();
283       do {
284         KeyValue kv = scanner.getKeyValue();
285         halfWriter.append(kv);
286       } while (scanner.next());
287 
288       for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
289         if (shouldCopyHFileMetaKey(entry.getKey())) {
290           halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
291         }
292       }
293     } finally {
294       if (halfWriter != null) halfWriter.close();
295       if (halfReader != null) halfReader.close();
296     }
297   }
298 
299   private static boolean shouldCopyHFileMetaKey(byte[] key) {
300     return !HFile.isReservedFileInfoKey(key);
301   }
302 
303 
304   @Override
305   public int run(String[] args) throws Exception {
306     if (args.length != 2) {
307       usage();
308       return -1;
309     }
310 
311     Path hfofDir = new Path(args[0]);
312     HTable table = new HTable(this.getConf(), args[1]);
313 
314     doBulkLoad(hfofDir, table);
315     return 0;
316   }
317 
318   public static void main(String[] args) throws Exception {
319     ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
320   }
321 
322 }