View Javadoc

1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.IOException;
23  
24  import org.apache.commons.logging.Log;
25  import org.apache.commons.logging.LogFactory;
26  import org.apache.hadoop.conf.Configurable;
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.client.HTable;
29  import org.apache.hadoop.hbase.client.Scan;
30  import org.apache.hadoop.hbase.util.Bytes;
31  import org.apache.hadoop.util.StringUtils;
32  
33  /**
34   * Convert HBase tabular data into a format that is consumable by Map/Reduce.
35   */
36  public class TableInputFormat extends TableInputFormatBase
37  implements Configurable {
38  
39    private final Log LOG = LogFactory.getLog(TableInputFormat.class);
40  
41    /** Job parameter that specifies the input table. */
42    public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
43    /** Base-64 encoded scanner. All other SCAN_ confs are ignored if this is specified.
44     * See {@link TableMapReduceUtil#convertScanToString(Scan)} for more details.
45     */
46    public static final String SCAN = "hbase.mapreduce.scan";
47    /** Column Family to Scan */
48    public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
49    /** Space delimited list of columns to scan. */
50    public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
51    /** The timestamp used to filter columns with a specific timestamp. */
52    public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
53    /** The starting timestamp used to filter columns with a specific range of versions. */
54    public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
55    /** The ending timestamp used to filter columns with a specific range of versions. */
56    public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
57    /** The maximum number of version to return. */
58    public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
59    /** Set to false to disable server-side caching of blocks for this scan. */
60    public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
61    /** The number of rows for caching that will be passed to scanners. */
62    public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
63  
64    /** The configuration. */
65    private Configuration conf = null;
66  
67    /**
68     * Returns the current configuration.
69     *
70     * @return The current configuration.
71     * @see org.apache.hadoop.conf.Configurable#getConf()
72     */
73    @Override
74    public Configuration getConf() {
75      return conf;
76    }
77  
78    /**
79     * Sets the configuration. This is used to set the details for the table to
80     * be scanned.
81     *
82     * @param configuration  The configuration to set.
83     * @see org.apache.hadoop.conf.Configurable#setConf(
84     *   org.apache.hadoop.conf.Configuration)
85     */
86    @Override
87    public void setConf(Configuration configuration) {
88      this.conf = configuration;
89      String tableName = conf.get(INPUT_TABLE);
90      try {
91        setHTable(new HTable(new Configuration(conf), tableName));
92      } catch (Exception e) {
93        LOG.error(StringUtils.stringifyException(e));
94      }
95  
96      Scan scan = null;
97  
98      if (conf.get(SCAN) != null) {
99        try {
100         scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
101       } catch (IOException e) {
102         LOG.error("An error occurred.", e);
103       }
104     } else {
105       try {
106         scan = new Scan();
107 
108         if (conf.get(SCAN_COLUMNS) != null) {
109           scan.addColumns(conf.get(SCAN_COLUMNS));
110         }
111 
112         if (conf.get(SCAN_COLUMN_FAMILY) != null) {
113           scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
114         }
115 
116         if (conf.get(SCAN_TIMESTAMP) != null) {
117           scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
118         }
119 
120         if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
121           scan.setTimeRange(
122               Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
123               Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
124         }
125 
126         if (conf.get(SCAN_MAXVERSIONS) != null) {
127           scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
128         }
129 
130         if (conf.get(SCAN_CACHEDROWS) != null) {
131           scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
132         }
133 
134         // false by default, full table scans generate too much BC churn
135         scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
136       } catch (Exception e) {
137           LOG.error(StringUtils.stringifyException(e));
138       }
139     }
140 
141     setScan(scan);
142   }
143 }