1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.IOException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configurable;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.client.HTable;
29 import org.apache.hadoop.hbase.client.Scan;
30 import org.apache.hadoop.hbase.util.Bytes;
31 import org.apache.hadoop.util.StringUtils;
32
33
34
35
36 public class TableInputFormat extends TableInputFormatBase
37 implements Configurable {
38
39 private final Log LOG = LogFactory.getLog(TableInputFormat.class);
40
41
42 public static final String INPUT_TABLE = "hbase.mapreduce.inputtable";
43
44
45
46 public static final String SCAN = "hbase.mapreduce.scan";
47
48 public static final String SCAN_COLUMN_FAMILY = "hbase.mapreduce.scan.column.family";
49
50 public static final String SCAN_COLUMNS = "hbase.mapreduce.scan.columns";
51
52 public static final String SCAN_TIMESTAMP = "hbase.mapreduce.scan.timestamp";
53
54 public static final String SCAN_TIMERANGE_START = "hbase.mapreduce.scan.timerange.start";
55
56 public static final String SCAN_TIMERANGE_END = "hbase.mapreduce.scan.timerange.end";
57
58 public static final String SCAN_MAXVERSIONS = "hbase.mapreduce.scan.maxversions";
59
60 public static final String SCAN_CACHEBLOCKS = "hbase.mapreduce.scan.cacheblocks";
61
62 public static final String SCAN_CACHEDROWS = "hbase.mapreduce.scan.cachedrows";
63
64
65 private Configuration conf = null;
66
67
68
69
70
71
72
73 @Override
74 public Configuration getConf() {
75 return conf;
76 }
77
78
79
80
81
82
83
84
85
86 @Override
87 public void setConf(Configuration configuration) {
88 this.conf = configuration;
89 String tableName = conf.get(INPUT_TABLE);
90 try {
91 setHTable(new HTable(new Configuration(conf), tableName));
92 } catch (Exception e) {
93 LOG.error(StringUtils.stringifyException(e));
94 }
95
96 Scan scan = null;
97
98 if (conf.get(SCAN) != null) {
99 try {
100 scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
101 } catch (IOException e) {
102 LOG.error("An error occurred.", e);
103 }
104 } else {
105 try {
106 scan = new Scan();
107
108 if (conf.get(SCAN_COLUMNS) != null) {
109 scan.addColumns(conf.get(SCAN_COLUMNS));
110 }
111
112 if (conf.get(SCAN_COLUMN_FAMILY) != null) {
113 scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
114 }
115
116 if (conf.get(SCAN_TIMESTAMP) != null) {
117 scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
118 }
119
120 if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
121 scan.setTimeRange(
122 Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
123 Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
124 }
125
126 if (conf.get(SCAN_MAXVERSIONS) != null) {
127 scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
128 }
129
130 if (conf.get(SCAN_CACHEDROWS) != null) {
131 scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
132 }
133
134
135 scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));
136 } catch (Exception e) {
137 LOG.error(StringUtils.stringifyException(e));
138 }
139 }
140
141 setScan(scan);
142 }
143 }