View Javadoc

1   /**
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.client.HTable;
29  import org.apache.hadoop.hbase.client.Result;
30  import org.apache.hadoop.hbase.client.Scan;
31  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32  import org.apache.hadoop.hbase.util.Bytes;
33  import org.apache.hadoop.hbase.util.Pair;
34  import org.apache.hadoop.mapreduce.InputFormat;
35  import org.apache.hadoop.mapreduce.InputSplit;
36  import org.apache.hadoop.mapreduce.JobContext;
37  import org.apache.hadoop.mapreduce.RecordReader;
38  import org.apache.hadoop.mapreduce.TaskAttemptContext;
39  
40  /**
41   * A base for {@link TableInputFormat}s. Receives a {@link HTable}, an
42   * {@link Scan} instance that defines the input columns etc. Subclasses may use
43   * other TableRecordReader implementations.
44   * <p>
45   * An example of a subclass:
46   * <pre>
47   *   class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
48   *
49   *     public void configure(JobConf job) {
50   *       HTable exampleTable = new HTable(HBaseConfiguration.create(job),
51   *         Bytes.toBytes("exampleTable"));
52   *       // mandatory
53   *       setHTable(exampleTable);
54   *       Text[] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
55   *         Bytes.toBytes("columnB") };
56   *       // mandatory
57   *       setInputColumns(inputColumns);
58   *       RowFilterInterface exampleFilter = new RegExpRowFilter("keyPrefix.*");
59   *       // optional
60   *       setRowFilter(exampleFilter);
61   *     }
62   *
63   *     public void validateInput(JobConf job) throws IOException {
64   *     }
65   *  }
66   * </pre>
67   */
68  public abstract class TableInputFormatBase
69  extends InputFormat<ImmutableBytesWritable, Result> {
70  
71    final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
72  
73    /** Holds the details for the internal scanner. */
74    private Scan scan = null;
75    /** The table to scan. */
76    private HTable table = null;
77    /** The reader scanning the table, can be a custom one. */
78    private TableRecordReader tableRecordReader = null;
79  
80  
81    /**
82     * Builds a TableRecordReader. If no TableRecordReader was provided, uses
83     * the default.
84     *
85     * @param split  The split to work with.
86     * @param context  The current context.
87     * @return The newly created record reader.
88     * @throws IOException When creating the reader fails.
89     * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(
90     *   org.apache.hadoop.mapreduce.InputSplit,
91     *   org.apache.hadoop.mapreduce.TaskAttemptContext)
92     */
93    @Override
94    public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
95        InputSplit split, TaskAttemptContext context)
96    throws IOException {
97      if (table == null) {
98        throw new IOException("Cannot create a record reader because of a" +
99            " previous error. Please look at the previous logs lines from" +
100           " the task's full log for more details.");
101     }
102     TableSplit tSplit = (TableSplit) split;
103     TableRecordReader trr = this.tableRecordReader;
104     // if no table record reader was provided use default
105     if (trr == null) {
106       trr = new TableRecordReader();
107     }
108     Scan sc = new Scan(this.scan);
109     sc.setStartRow(tSplit.getStartRow());
110     sc.setStopRow(tSplit.getEndRow());
111     trr.setScan(sc);
112     trr.setHTable(table);
113     trr.init();
114     return trr;
115   }
116 
117   /**
118    * Calculates the splits that will serve as input for the map tasks. The
119    * number of splits matches the number of regions in a table.
120    *
121    * @param context  The current job context.
122    * @return The list of input splits.
123    * @throws IOException When creating the list of splits fails.
124    * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(
125    *   org.apache.hadoop.mapreduce.JobContext)
126    */
127   @Override
128   public List<InputSplit> getSplits(JobContext context) throws IOException {
129 	if (table == null) {
130 	    throw new IOException("No table was provided.");
131 	}
132     Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
133     if (keys == null || keys.getFirst() == null ||
134         keys.getFirst().length == 0) {
135       throw new IOException("Expecting at least one region.");
136     }
137     int count = 0;
138     List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
139     for (int i = 0; i < keys.getFirst().length; i++) {
140       if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
141         continue;
142       }
143       String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
144         getServerAddress().getHostname();
145       byte[] startRow = scan.getStartRow();
146       byte[] stopRow = scan.getStopRow();
147       // determine if the given start an stop key fall into the region
148       if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
149            Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
150           (stopRow.length == 0 ||
151            Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
152         byte[] splitStart = startRow.length == 0 ||
153           Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
154             keys.getFirst()[i] : startRow;
155         byte[] splitStop = (stopRow.length == 0 ||
156           Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
157           keys.getSecond()[i].length > 0 ?
158             keys.getSecond()[i] : stopRow;
159         InputSplit split = new TableSplit(table.getTableName(),
160           splitStart, splitStop, regionLocation);
161         splits.add(split);
162         if (LOG.isDebugEnabled())
163           LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
164       }
165     }
166     return splits;
167   }
168 
169   /**
170    *
171    *
172    * Test if the given region is to be included in the InputSplit while splitting
173    * the regions of a table.
174    * <p>
175    * This optimization is effective when there is a specific reasoning to exclude an entire region from the M-R job,
176    * (and hence, not contributing to the InputSplit), given the start and end keys of the same. <br>
177    * Useful when we need to remember the last-processed top record and revisit the [last, current) interval for M-R processing,
178    * continuously. In addition to reducing InputSplits, reduces the load on the region server as well, due to the ordering of the keys.
179    * <br>
180    * <br>
181    * Note: It is possible that <code>endKey.length() == 0 </code> , for the last (recent) region.
182    * <br>
183    * Override this method, if you want to bulk exclude regions altogether from M-R. By default, no region is excluded( i.e. all regions are included).
184    *
185    *
186    * @param startKey Start key of the region
187    * @param endKey End key of the region
188    * @return true, if this region needs to be included as part of the input (default).
189    *
190    */
191   protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) {
192     return true;
193   }
194 
195   /**
196    * Allows subclasses to get the {@link HTable}.
197    */
198   protected HTable getHTable() {
199     return this.table;
200   }
201 
202   /**
203    * Allows subclasses to set the {@link HTable}.
204    *
205    * @param table  The table to get the data from.
206    */
207   protected void setHTable(HTable table) {
208     this.table = table;
209   }
210 
211   /**
212    * Gets the scan defining the actual details like columns etc.
213    *
214    * @return The internal scan instance.
215    */
216   public Scan getScan() {
217     if (this.scan == null) this.scan = new Scan();
218     return scan;
219   }
220 
221   /**
222    * Sets the scan defining the actual details like columns etc.
223    *
224    * @param scan  The scan to set.
225    */
226   public void setScan(Scan scan) {
227     this.scan = scan;
228   }
229 
230   /**
231    * Allows subclasses to set the {@link TableRecordReader}.
232    *
233    * @param tableRecordReader A different {@link TableRecordReader}
234    *   implementation.
235    */
236   protected void setTableRecordReader(TableRecordReader tableRecordReader) {
237     this.tableRecordReader = tableRecordReader;
238   }
239 }