1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.util.Deque;
25 import java.util.LinkedList;
26 import java.util.Map;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.conf.Configured;
32 import org.apache.hadoop.fs.FileStatus;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.FileUtil;
35 import org.apache.hadoop.fs.Path;
36 import org.apache.hadoop.hbase.HBaseConfiguration;
37 import org.apache.hadoop.hbase.HColumnDescriptor;
38 import org.apache.hadoop.hbase.HRegionInfo;
39 import org.apache.hadoop.hbase.KeyValue;
40 import org.apache.hadoop.hbase.TableNotFoundException;
41 import org.apache.hadoop.hbase.client.HConnection;
42 import org.apache.hadoop.hbase.client.HTable;
43 import org.apache.hadoop.hbase.client.ServerCallable;
44 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
45 import org.apache.hadoop.hbase.io.Reference;
46 import org.apache.hadoop.hbase.io.Reference.Range;
47 import org.apache.hadoop.hbase.io.hfile.HFile;
48 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
49 import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm;
50 import org.apache.hadoop.hbase.regionserver.StoreFile;
51 import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
52 import org.apache.hadoop.hbase.util.Bytes;
53 import org.apache.hadoop.util.Tool;
54 import org.apache.hadoop.util.ToolRunner;
55
56
57
58
59
60 public class LoadIncrementalHFiles extends Configured implements Tool {
61
62 static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
63
64 public static String NAME = "completebulkload";
65
66 public LoadIncrementalHFiles(Configuration conf) {
67 super(conf);
68 }
69
70 public LoadIncrementalHFiles() {
71 super();
72 }
73
74
75 private void usage() {
76 System.err.println("usage: " + NAME +
77 " /path/to/hfileoutputformat-output " +
78 "tablename");
79 }
80
81
82
83
84
85
86
87
88
89 private static class LoadQueueItem {
90 final byte[] family;
91 final Path hfilePath;
92
93 public LoadQueueItem(byte[] family, Path hfilePath) {
94 this.family = family;
95 this.hfilePath = hfilePath;
96 }
97 }
98
99
100
101
102
103 private Deque<LoadQueueItem> discoverLoadQueue(Path hfofDir)
104 throws IOException {
105 FileSystem fs = hfofDir.getFileSystem(getConf());
106
107 if (!fs.exists(hfofDir)) {
108 throw new FileNotFoundException("HFileOutputFormat dir " +
109 hfofDir + " not found");
110 }
111
112 FileStatus[] familyDirStatuses = fs.listStatus(hfofDir);
113 if (familyDirStatuses == null) {
114 throw new FileNotFoundException("No families found in " + hfofDir);
115 }
116
117 Deque<LoadQueueItem> ret = new LinkedList<LoadQueueItem>();
118 for (FileStatus stat : familyDirStatuses) {
119 if (!stat.isDir()) {
120 LOG.warn("Skipping non-directory " + stat.getPath());
121 continue;
122 }
123 Path familyDir = stat.getPath();
124
125 if (familyDir.getName().startsWith("_")) continue;
126 byte[] family = familyDir.getName().getBytes();
127 Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
128 for (Path hfile : hfiles) {
129 if (hfile.getName().startsWith("_")) continue;
130 ret.add(new LoadQueueItem(family, hfile));
131 }
132 }
133 return ret;
134 }
135
136
137
138
139
140
141
142
143
144 public void doBulkLoad(Path hfofDir, HTable table)
145 throws TableNotFoundException, IOException
146 {
147 HConnection conn = table.getConnection();
148
149 if (!conn.isTableAvailable(table.getTableName())) {
150 throw new TableNotFoundException("Table " +
151 Bytes.toStringBinary(table.getTableName()) +
152 "is not currently available.");
153 }
154
155 Deque<LoadQueueItem> queue = null;
156 try {
157 queue = discoverLoadQueue(hfofDir);
158 while (!queue.isEmpty()) {
159 LoadQueueItem item = queue.remove();
160 tryLoad(item, conn, table.getTableName(), queue);
161 }
162 } finally {
163 if (queue != null && !queue.isEmpty()) {
164 StringBuilder err = new StringBuilder();
165 err.append("-------------------------------------------------\n");
166 err.append("Bulk load aborted with some files not yet loaded:\n");
167 err.append("-------------------------------------------------\n");
168 for (LoadQueueItem q : queue) {
169 err.append(" ").append(q.hfilePath).append('\n');
170 }
171 LOG.error(err);
172 }
173 }
174 }
175
176
177
178
179
180
181
182 private void tryLoad(final LoadQueueItem item,
183 HConnection conn, final byte[] table,
184 final Deque<LoadQueueItem> queue)
185 throws IOException {
186 final Path hfilePath = item.hfilePath;
187 final FileSystem fs = hfilePath.getFileSystem(getConf());
188 HFile.Reader hfr = new HFile.Reader(fs, hfilePath, null, false);
189 final byte[] first, last;
190 try {
191 hfr.loadFileInfo();
192 first = hfr.getFirstRowKey();
193 last = hfr.getLastRowKey();
194 } finally {
195 hfr.close();
196 }
197
198 LOG.info("Trying to load hfile=" + hfilePath +
199 " first=" + Bytes.toStringBinary(first) +
200 " last=" + Bytes.toStringBinary(last));
201 if (first == null || last == null) {
202 assert first == null && last == null;
203 LOG.info("hfile " + hfilePath + " has no entries, skipping");
204 return;
205 }
206
207
208
209 final Path tmpDir = new Path(item.hfilePath.getParent(), "_tmp");
210
211 conn.getRegionServerWithRetries(
212 new ServerCallable<Void>(conn, table, first) {
213 @Override
214 public Void call() throws Exception {
215 LOG.debug("Going to connect to server " + location +
216 "for row " + Bytes.toStringBinary(row));
217 HRegionInfo hri = location.getRegionInfo();
218 if (!hri.containsRange(first, last)) {
219 LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
220 "region. Splitting...");
221
222 HColumnDescriptor familyDesc = hri.getTableDesc().getFamily(item.family);
223 Path botOut = new Path(tmpDir, hri.getEncodedName() + ".bottom");
224 Path topOut = new Path(tmpDir, hri.getEncodedName() + ".top");
225 splitStoreFile(getConf(), hfilePath, familyDesc, hri.getEndKey(),
226 botOut, topOut);
227
228
229
230 queue.addFirst(new LoadQueueItem(item.family, botOut));
231 queue.addFirst(new LoadQueueItem(item.family, topOut));
232 LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
233 return null;
234 }
235
236 byte[] regionName = location.getRegionInfo().getRegionName();
237 server.bulkLoadHFile(hfilePath.toString(), regionName, item.family);
238 return null;
239 }
240 });
241 }
242
243
244
245
246
247 static void splitStoreFile(
248 Configuration conf, Path inFile,
249 HColumnDescriptor familyDesc, byte[] splitKey,
250 Path bottomOut, Path topOut) throws IOException
251 {
252
253 Reference topReference = new Reference(splitKey, Range.top);
254 Reference bottomReference = new Reference(splitKey, Range.bottom);
255
256 copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
257 copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
258 }
259
260
261
262
263 private static void copyHFileHalf(
264 Configuration conf, Path inFile, Path outFile, Reference reference,
265 HColumnDescriptor familyDescriptor)
266 throws IOException {
267 FileSystem fs = inFile.getFileSystem(conf);
268 HalfStoreFileReader halfReader = null;
269 StoreFile.Writer halfWriter = null;
270 try {
271 halfReader = new HalfStoreFileReader(fs, inFile, null, reference);
272 Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
273
274 int blocksize = familyDescriptor.getBlocksize();
275 Algorithm compression = familyDescriptor.getCompression();
276 BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
277
278 halfWriter = new StoreFile.Writer(
279 fs, outFile, blocksize, compression, conf, KeyValue.COMPARATOR,
280 bloomFilterType, 0);
281 HFileScanner scanner = halfReader.getScanner(false, false);
282 scanner.seekTo();
283 do {
284 KeyValue kv = scanner.getKeyValue();
285 halfWriter.append(kv);
286 } while (scanner.next());
287
288 for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
289 if (shouldCopyHFileMetaKey(entry.getKey())) {
290 halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
291 }
292 }
293 } finally {
294 if (halfWriter != null) halfWriter.close();
295 if (halfReader != null) halfReader.close();
296 }
297 }
298
299 private static boolean shouldCopyHFileMetaKey(byte[] key) {
300 return !HFile.isReservedFileInfoKey(key);
301 }
302
303
304 @Override
305 public int run(String[] args) throws Exception {
306 if (args.length != 2) {
307 usage();
308 return -1;
309 }
310
311 Path hfofDir = new Path(args[0]);
312 HTable table = new HTable(this.getConf(), args[1]);
313
314 doBulkLoad(hfofDir, table);
315 return 0;
316 }
317
318 public static void main(String[] args) throws Exception {
319 ToolRunner.run(new LoadIncrementalHFiles(HBaseConfiguration.create()), args);
320 }
321
322 }