View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapred;
21  
22  import java.io.IOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.hbase.HConstants;
27  import org.apache.hadoop.hbase.client.HTable;
28  import org.apache.hadoop.hbase.client.Result;
29  import org.apache.hadoop.hbase.filter.Filter;
30  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31  import org.apache.hadoop.hbase.regionserver.HRegion;
32  import org.apache.hadoop.mapred.InputFormat;
33  import org.apache.hadoop.mapred.InputSplit;
34  import org.apache.hadoop.mapred.JobConf;
35  import org.apache.hadoop.mapred.RecordReader;
36  import org.apache.hadoop.mapred.Reporter;
37  
38  /**
39   * A Base for {@link TableInputFormat}s. Receives a {@link HTable}, a
40   * byte[] of input columns and optionally a {@link Filter}.
41   * Subclasses may use other TableRecordReader implementations.
42   * <p>
43   * An example of a subclass:
44   * <pre>
45   *   class ExampleTIF extends TableInputFormatBase implements JobConfigurable {
46   *
47   *     public void configure(JobConf job) {
48   *       HTable exampleTable = new HTable(HBaseConfiguration.create(job),
49   *         Bytes.toBytes("exampleTable"));
50   *       // mandatory
51   *       setHTable(exampleTable);
52   *       Text[] inputColumns = new byte [][] { Bytes.toBytes("columnA"),
53   *         Bytes.toBytes("columnB") };
54   *       // mandatory
55   *       setInputColumns(inputColumns);
56   *       RowFilterInterface exampleFilter = new RegExpRowFilter("keyPrefix.*");
57   *       // optional
58   *       setRowFilter(exampleFilter);
59   *     }
60   *
61   *     public void validateInput(JobConf job) throws IOException {
62   *     }
63   *  }
64   * </pre>
65   */
66  
67  @Deprecated
68  public abstract class TableInputFormatBase
69  implements InputFormat<ImmutableBytesWritable, Result> {
70    final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
71    private byte [][] inputColumns;
72    private HTable table;
73    private TableRecordReader tableRecordReader;
74    private Filter rowFilter;
75  
76    /**
77     * Builds a TableRecordReader. If no TableRecordReader was provided, uses
78     * the default.
79     *
80     * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
81     *      JobConf, Reporter)
82     */
83    public RecordReader<ImmutableBytesWritable, Result> getRecordReader(
84        InputSplit split, JobConf job, Reporter reporter)
85    throws IOException {
86      TableSplit tSplit = (TableSplit) split;
87      TableRecordReader trr = this.tableRecordReader;
88      // if no table record reader was provided use default
89      if (trr == null) {
90        trr = new TableRecordReader();
91      }
92      trr.setStartRow(tSplit.getStartRow());
93      trr.setEndRow(tSplit.getEndRow());
94      trr.setHTable(this.table);
95      trr.setInputColumns(this.inputColumns);
96      trr.setRowFilter(this.rowFilter);
97      trr.init();
98      return trr;
99    }
100 
101   /**
102    * Calculates the splits that will serve as input for the map tasks.
103    * <ul>
104    * Splits are created in number equal to the smallest between numSplits and
105    * the number of {@link HRegion}s in the table. If the number of splits is
106    * smaller than the number of {@link HRegion}s then splits are spanned across
107    * multiple {@link HRegion}s and are grouped the most evenly possible. In the
108    * case splits are uneven the bigger splits are placed first in the
109    * {@link InputSplit} array.
110    *
111    * @param job the map task {@link JobConf}
112    * @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
113    *
114    * @return the input splits
115    *
116    * @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
117    */
118   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
119     if (this.table == null) {
120       throw new IOException("No table was provided");
121     }
122     byte [][] startKeys = this.table.getStartKeys();
123     if (startKeys == null || startKeys.length == 0) {
124       throw new IOException("Expecting at least one region");
125     }
126     if (this.inputColumns == null || this.inputColumns.length == 0) {
127       throw new IOException("Expecting at least one column");
128     }
129     int realNumSplits = numSplits > startKeys.length? startKeys.length:
130       numSplits;
131     InputSplit[] splits = new InputSplit[realNumSplits];
132     int middle = startKeys.length / realNumSplits;
133     int startPos = 0;
134     for (int i = 0; i < realNumSplits; i++) {
135       int lastPos = startPos + middle;
136       lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
137       String regionLocation = table.getRegionLocation(startKeys[startPos]).
138         getServerAddress().getHostname();
139       splits[i] = new TableSplit(this.table.getTableName(),
140         startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
141           HConstants.EMPTY_START_ROW, regionLocation);
142       LOG.info("split: " + i + "->" + splits[i]);
143       startPos = lastPos;
144     }
145     return splits;
146   }
147 
148   /**
149    * @param inputColumns to be passed in {@link Result} to the map task.
150    */
151   protected void setInputColumns(byte [][] inputColumns) {
152     this.inputColumns = inputColumns;
153   }
154 
155   /**
156    * Allows subclasses to get the {@link HTable}.
157    */
158   protected HTable getHTable() {
159     return this.table;
160   }
161 
162   /**
163    * Allows subclasses to set the {@link HTable}.
164    *
165    * @param table to get the data from
166    */
167   protected void setHTable(HTable table) {
168     this.table = table;
169   }
170 
171   /**
172    * Allows subclasses to set the {@link TableRecordReader}.
173    *
174    * @param tableRecordReader
175    *                to provide other {@link TableRecordReader} implementations.
176    */
177   protected void setTableRecordReader(TableRecordReader tableRecordReader) {
178     this.tableRecordReader = tableRecordReader;
179   }
180 
181   /**
182    * Allows subclasses to set the {@link Filter} to be used.
183    *
184    * @param rowFilter
185    */
186   protected void setRowFilter(Filter rowFilter) {
187     this.rowFilter = rowFilter;
188   }
189 }