View Javadoc

1   /**
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.IOException;
23  import java.net.URI;
24  import java.net.URISyntaxException;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.TreeMap;
29  import java.util.TreeSet;
30  
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.filecache.DistributedCache;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.KeyValue;
37  import org.apache.hadoop.hbase.client.HTable;
38  import org.apache.hadoop.hbase.client.Put;
39  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
40  import org.apache.hadoop.hbase.io.hfile.Compression;
41  import org.apache.hadoop.hbase.io.hfile.HFile;
42  import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
43  import org.apache.hadoop.hbase.regionserver.StoreFile;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.io.NullWritable;
46  import org.apache.hadoop.io.SequenceFile;
47  import org.apache.hadoop.mapreduce.Job;
48  import org.apache.hadoop.mapreduce.RecordWriter;
49  import org.apache.hadoop.mapreduce.TaskAttemptContext;
50  import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
51  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
52  
53  import org.apache.commons.logging.Log;
54  import org.apache.commons.logging.LogFactory;
55  
56  /**
57   * Writes HFiles. Passed KeyValues must arrive in order.
58   * Currently, can only write files to a single column family at a
59   * time.  Multiple column families requires coordinating keys cross family.
60   * Writes current time as the sequence id for the file. Sets the major compacted
61   * attribute on created hfiles.
62   * @see KeyValueSortReducer
63   */
64  public class HFileOutputFormat extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
65    static Log LOG = LogFactory.getLog(HFileOutputFormat.class);
66    
67    public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
68    throws IOException, InterruptedException {
69      // Get the path of the temporary output file
70      final Path outputPath = FileOutputFormat.getOutputPath(context);
71      final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
72      Configuration conf = context.getConfiguration();
73      final FileSystem fs = outputdir.getFileSystem(conf);
74      // These configs. are from hbase-*.xml
75      final long maxsize = conf.getLong("hbase.hregion.max.filesize", 268435456);
76      final int blocksize =
77        conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize", 65536);
78      // Invented config.  Add to hbase-*.xml if other than default compression.
79      final String compression = conf.get("hfile.compression",
80        Compression.Algorithm.NONE.getName());
81  
82      return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
83        // Map of families to writers and how much has been output on the writer.
84        private final Map<byte [], WriterLength> writers =
85          new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
86        private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
87        private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
88  
89        public void write(ImmutableBytesWritable row, KeyValue kv)
90        throws IOException {
91          long length = kv.getLength();
92          byte [] family = kv.getFamily();
93          WriterLength wl = this.writers.get(family);
94          if (wl == null || ((length + wl.written) >= maxsize) &&
95              Bytes.compareTo(this.previousRow, 0, this.previousRow.length,
96                kv.getBuffer(), kv.getRowOffset(), kv.getRowLength()) != 0) {
97            // Get a new writer.
98            Path basedir = new Path(outputdir, Bytes.toString(family));
99            if (wl == null) {
100             wl = new WriterLength();
101             this.writers.put(family, wl);
102             if (this.writers.size() > 1) throw new IOException("One family only");
103             // If wl == null, first file in family.  Ensure family dir exits.
104             if (!fs.exists(basedir)) fs.mkdirs(basedir);
105           }
106           wl.writer = getNewWriter(wl.writer, basedir);
107           LOG.info("Writer=" + wl.writer.getPath() +
108             ((wl.written == 0)? "": ", wrote=" + wl.written));
109           wl.written = 0;
110         }
111         kv.updateLatestStamp(this.now);
112         wl.writer.append(kv);
113         wl.written += length;
114         // Copy the row so we know when a row transition.
115         this.previousRow = kv.getRow();
116       }
117 
118       /* Create a new HFile.Writer. Close current if there is one.
119        * @param writer
120        * @param familydir
121        * @return A new HFile.Writer.
122        * @throws IOException
123        */
124       private HFile.Writer getNewWriter(final HFile.Writer writer,
125           final Path familydir)
126       throws IOException {
127         close(writer);
128         return new HFile.Writer(fs,  StoreFile.getUniqueFile(fs, familydir),
129           blocksize, compression, KeyValue.KEY_COMPARATOR);
130       }
131 
132       private void close(final HFile.Writer w) throws IOException {
133         if (w != null) {
134           w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
135               Bytes.toBytes(System.currentTimeMillis()));
136           w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
137               Bytes.toBytes(context.getTaskAttemptID().toString()));
138           w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, 
139               Bytes.toBytes(true));
140           w.close();
141         }
142       }
143 
144       public void close(TaskAttemptContext c)
145       throws IOException, InterruptedException {
146         for (Map.Entry<byte [], WriterLength> e: this.writers.entrySet()) {
147           close(e.getValue().writer);
148         }
149       }
150     };
151   }
152 
153   /*
154    * Data structure to hold a Writer and amount of data written on it.
155    */
156   static class WriterLength {
157     long written = 0;
158     HFile.Writer writer = null;
159   }
160 
161   /**
162    * Return the start keys of all of the regions in this table,
163    * as a list of ImmutableBytesWritable.
164    */
165   private static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
166   throws IOException {
167     byte[][] byteKeys = table.getStartKeys();
168     ArrayList<ImmutableBytesWritable> ret =
169       new ArrayList<ImmutableBytesWritable>(byteKeys.length);
170     for (byte[] byteKey : byteKeys) {
171       ret.add(new ImmutableBytesWritable(byteKey));
172     }
173     return ret;
174   }
175 
176   /**
177    * Write out a SequenceFile that can be read by TotalOrderPartitioner
178    * that contains the split points in startKeys.
179    * @param partitionsPath output path for SequenceFile
180    * @param startKeys the region start keys
181    */
182   private static void writePartitions(Configuration conf, Path partitionsPath,
183       List<ImmutableBytesWritable> startKeys) throws IOException {
184     if (startKeys.isEmpty()) {
185       throw new IllegalArgumentException("No regions passed");
186     }
187 
188     // We're generating a list of split points, and we don't ever
189     // have keys < the first region (which has an empty start key)
190     // so we need to remove it. Otherwise we would end up with an
191     // empty reducer with index 0
192     TreeSet<ImmutableBytesWritable> sorted =
193       new TreeSet<ImmutableBytesWritable>(startKeys);
194 
195     ImmutableBytesWritable first = sorted.first();
196     if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) {
197       throw new IllegalArgumentException(
198           "First region of table should have empty start key. Instead has: "
199           + Bytes.toStringBinary(first.get()));
200     }
201     sorted.remove(first);
202     
203     // Write the actual file
204     FileSystem fs = partitionsPath.getFileSystem(conf);
205     SequenceFile.Writer writer = SequenceFile.createWriter(fs, 
206         conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class);
207     
208     try {
209       for (ImmutableBytesWritable startKey : sorted) {
210         writer.append(startKey, NullWritable.get());
211       }
212     } finally {
213       writer.close();
214     }
215   }
216   
217   /**
218    * Configure a MapReduce Job to perform an incremental load into the given
219    * table. This
220    * <ul>
221    *   <li>Inspects the table to configure a total order partitioner</li>
222    *   <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li>
223    *   <li>Sets the number of reduce tasks to match the current number of regions</li>
224    *   <li>Sets the output key/value class to match HFileOutputFormat's requirements</li>
225    *   <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or
226    *     PutSortReducer)</li>
227    * </ul> 
228    * The user should be sure to set the map output value class to either KeyValue or Put before
229    * running this function.
230    */
231   public static void configureIncrementalLoad(Job job, HTable table) throws IOException {
232     Configuration conf = job.getConfiguration();
233     job.setPartitionerClass(TotalOrderPartitioner.class);
234     job.setOutputKeyClass(ImmutableBytesWritable.class);
235     job.setOutputValueClass(KeyValue.class);
236     job.setOutputFormatClass(HFileOutputFormat.class);
237     
238     // Based on the configured map output class, set the correct reducer to properly
239     // sort the incoming values.
240     // TODO it would be nice to pick one or the other of these formats.
241     if (KeyValue.class.equals(job.getMapOutputValueClass())) {
242       job.setReducerClass(KeyValueSortReducer.class);
243     } else if (Put.class.equals(job.getMapOutputValueClass())) {
244       job.setReducerClass(PutSortReducer.class);
245     } else {
246       LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
247     }
248     
249     LOG.info("Looking up current regions for table " + table);
250     List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
251     LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
252         "to match current region count");
253     job.setNumReduceTasks(startKeys.size());
254     
255     Path partitionsPath = new Path(job.getWorkingDirectory(),
256         "partitions_" + System.currentTimeMillis());
257     LOG.info("Writing partition information to " + partitionsPath);
258 
259     FileSystem fs = partitionsPath.getFileSystem(conf);
260     writePartitions(conf, partitionsPath, startKeys);
261     partitionsPath.makeQualified(fs);
262     URI cacheUri;
263     try {
264       cacheUri = new URI(partitionsPath.toString() + "#" +
265           TotalOrderPartitioner.DEFAULT_PATH);
266     } catch (URISyntaxException e) {
267       throw new IOException(e);
268     }
269     DistributedCache.addCacheFile(cacheUri, conf);
270     DistributedCache.createSymlink(conf);
271     
272     LOG.info("Incremental table output configured.");
273   }
274   
275 }