1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.IOException;
23 import java.net.URI;
24 import java.net.URISyntaxException;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.TreeMap;
29 import java.util.TreeSet;
30
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.filecache.DistributedCache;
33 import org.apache.hadoop.fs.FileSystem;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.hbase.HConstants;
36 import org.apache.hadoop.hbase.KeyValue;
37 import org.apache.hadoop.hbase.client.HTable;
38 import org.apache.hadoop.hbase.client.Put;
39 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
40 import org.apache.hadoop.hbase.io.hfile.Compression;
41 import org.apache.hadoop.hbase.io.hfile.HFile;
42 import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
43 import org.apache.hadoop.hbase.regionserver.StoreFile;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.io.NullWritable;
46 import org.apache.hadoop.io.SequenceFile;
47 import org.apache.hadoop.mapreduce.Job;
48 import org.apache.hadoop.mapreduce.RecordWriter;
49 import org.apache.hadoop.mapreduce.TaskAttemptContext;
50 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
51 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
52
53 import org.apache.commons.logging.Log;
54 import org.apache.commons.logging.LogFactory;
55
56
57
58
59
60
61
62
63
64 public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
65 static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
66
67 public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
68 throws IOException, InterruptedException {
69
70 final Path outputPath = FileOutputFormat.getOutputPath(context);
71 final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
72 Configuration conf = context.getConfiguration();
73 final FileSystem fs = outputdir.getFileSystem(conf);
74
75 final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456);
76 final int blocksize =
77 conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize", 65536);
78
79 final String compression = conf.get("hfile.compression",
80 Compression.Algorithm.NONE.getName());
81
82 return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
83
84 private final Map<byte [], WriterLength> writers =
85 new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
86 private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
87 private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
88
89 public void write(ImmutableBytesWritable row, KeyValue kv)
90 throws IOException {
91 long length = kv.getLength();
92 byte [] family = kv.getFamily();
93 WriterLength wl = this.writers.get(family);
94 if (wl == null || ((length + wl.written) >= maxsize) &&
95 Bytes.compareTo(this.previousRow, 0, this.previousRow.length,
96 kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) {
97
98 Path basedir = new Path(outputdir, Bytes.toString(family));
99 if (wl == null) {
100 wl = new WriterLength();
101 this.writers.put(family, wl);
102 if (this.writers.size() > 1) throw new IOException("One family only");
103
104 if (!fs.exists(basedir)) fs.mkdirs(basedir);
105 }
106 wl.writer = getNewWriter(wl.writer, basedir);
107 LOG.info("Writer=" + wl.writer.getPath() +
108 ((wl.written == 0)? "": ", wrote=" + wl.written));
109 wl.written = 0;
110 }
111 kv.updateLatestStamp(this.now);
112 wl.writer.append(kv);
113 wl.written += length;
114
115 this.previousRow = kv.getRow();
116 }
117
118
119
120
121
122
123
124 private HFile.Writer getNewWriter(final HFile.Writer writer,
125 final Path familydir)
126 throws IOException {
127 close(writer);
128 return new HFile.Writer(fs, StoreFile.getUniqueFile(fs, familydir),
129 blocksize, compression, KeyValue.KEY_COMPARATOR);
130 }
131
132 private void close(final HFile.Writer w) throws IOException {
133 if (w != null) {
134 w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
135 Bytes.toBytes(System.currentTimeMillis()));
136 w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
137 Bytes.toBytes(context.getTaskAttemptID().toString()));
138 w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
139 Bytes.toBytes(true));
140 w.close();
141 }
142 }
143
144 public void close(TaskAttemptContext c)
145 throws IOException, InterruptedException {
146 for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) {
147 close(e.getValue().writer);
148 }
149 }
150 };
151 }
152
153
154
155
156 static class WriterLength {
157 long written = 0;
158 HFile.Writer writer = null;
159 }
160
161
162
163
164
165 private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
166 throws IOException {
167 byte[][] byteKeys = table.getStartKeys();
168 ArrayList<ImmutableBytesWritable> ret =
169 new ArrayList<ImmutableBytesWritable>(byteKeys.length);
170 for (byte[] byteKey : byteKeys) {
171 ret.add(new ImmutableBytesWritable(byteKey));
172 }
173 return ret;
174 }
175
176
177
178
179
180
181
182 private static void writePartitions(Configuration conf, Path partitionsPath,
183 List<ImmutableBytesWritable> startKeys) throws IOException {
184 if (startKeys.isEmpty()) {
185 throw new IllegalArgumentException("No regions passed");
186 }
187
188
189
190
191
192 TreeSet<ImmutableBytesWritable> sorted =
193 new TreeSet<ImmutableBytesWritable>(startKeys);
194
195 ImmutableBytesWritable first = sorted.first();
196 if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
197 throw new IllegalArgumentException(
198 "First region of table should have empty start key. Instead has: "
199 + Bytes.toStringBinary(first.get()));
200 }
201 sorted.remove(first);
202
203
204 FileSystem fs = partitionsPath.getFileSystem(conf);
205 SequenceFile.Writer writer = SequenceFile.createWriter(fs,
206 conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class);
207
208 try {
209 for (ImmutableBytesWritable startKey : sorted) {
210 writer.append(startKey, NullWritable.get());
211 }
212 } finally {
213 writer.close();
214 }
215 }
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231 public static void configureIncrementalLoad(Job job, HTable table) throws IOException {
232 Configuration conf = job.getConfiguration();
233 job.setPartitionerClass(TotalOrderPartitioner.class);
234 job.setOutputKeyClass(ImmutableBytesWritable.class);
235 job.setOutputValueClass(KeyValue.class);
236 job.setOutputFormatClass(HFileOutputFormat.class);
237
238
239
240
241 if (KeyValue.class.equals(job.getMapOutputValueClass())) {
242 job.setReducerClass(KeyValueSortReducer.class);
243 } else if (Put.class.equals(job.getMapOutputValueClass())) {
244 job.setReducerClass(PutSortReducer.class);
245 } else {
246 LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
247 }
248
249 LOG.info("Looking up current regions for table " + table);
250 List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
251 LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
252 "to match current region count");
253 job.setNumReduceTasks(startKeys.size());
254
255 Path partitionsPath = new Path(job.getWorkingDirectory(),
256 "partitions_" + System.currentTimeMillis());
257 LOG.info("Writing partition information to " + partitionsPath);
258
259 FileSystem fs = partitionsPath.getFileSystem(conf);
260 writePartitions(conf, partitionsPath, startKeys);
261 partitionsPath.makeQualified(fs);
262 URI cacheUri;
263 try {
264 cacheUri = new URI(partitionsPath.toString() + "#" +
265 TotalOrderPartitioner.DEFAULT_PATH);
266 } catch (URISyntaxException e) {
267 throw new IOException(e);
268 }
269 DistributedCache.addCacheFile(cacheUri, conf);
270 DistributedCache.createSymlink(conf);
271
272 LOG.info("Incremental table output configured.");
273 }
274
275 }