1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.List;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.client.HTable;
29 import org.apache.hadoop.hbase.client.Result;
30 import org.apache.hadoop.hbase.client.Scan;
31 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32 import org.apache.hadoop.hbase.util.Bytes;
33 import org.apache.hadoop.hbase.util.Pair;
34 import org.apache.hadoop.mapreduce.InputFormat;
35 import org.apache.hadoop.mapreduce.InputSplit;
36 import org.apache.hadoop.mapreduce.JobContext;
37 import org.apache.hadoop.mapreduce.RecordReader;
38 import org.apache.hadoop.mapreduce.TaskAttemptContext;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 public abstract class TableInputFormatBase
69 extends InputFormat<ImmutableBytesWritable, Result> {
70
71 final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
72
73
74 private Scan scan = null;
75
76 private HTable table = null;
77
78 private TableRecordReader tableRecordReader = null;
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 @Override
94 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
95 InputSplit split, TaskAttemptContext context)
96 throws IOException {
97 if (table == null) {
98 throw new IOException("Cannot create a record reader because of a" +
99 " previous error. Please look at the previous logs lines from" +
100 " the task's full log for more details.");
101 }
102 TableSplit tSplit = (TableSplit) split;
103 TableRecordReader trr = this.tableRecordReader;
104
105 if (trr == null) {
106 trr = new TableRecordReader();
107 }
108 Scan sc = new Scan(this.scan);
109 sc.setStartRow(tSplit.getStartRow());
110 sc.setStopRow(tSplit.getEndRow());
111 trr.setScan(sc);
112 trr.setHTable(table);
113 trr.init();
114 return trr;
115 }
116
117
118
119
120
121
122
123
124
125
126
127 @Override
128 public List<InputSplit> getSplits(JobContext context) throws IOException {
129 if (table == null) {
130 throw new IOException("No table was provided.");
131 }
132 Pair<byte[][], byte[][]> keys = table.getStartEndKeys();
133 if (keys == null || keys.getFirst() == null ||
134 keys.getFirst().length == 0) {
135 throw new IOException("Expecting at least one region.");
136 }
137 int count = 0;
138 List<InputSplit> splits = new ArrayList<InputSplit>(keys.getFirst().length);
139 for (int i = 0; i < keys.getFirst().length; i++) {
140 if ( !includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) {
141 continue;
142 }
143 String regionLocation = table.getRegionLocation(keys.getFirst()[i]).
144 getServerAddress().getHostname();
145 byte[] startRow = scan.getStartRow();
146 byte[] stopRow = scan.getStopRow();
147
148 if ((startRow.length == 0 || keys.getSecond()[i].length == 0 ||
149 Bytes.compareTo(startRow, keys.getSecond()[i]) < 0) &&
150 (stopRow.length == 0 ||
151 Bytes.compareTo(stopRow, keys.getFirst()[i]) > 0)) {
152 byte[] splitStart = startRow.length == 0 ||
153 Bytes.compareTo(keys.getFirst()[i], startRow) >= 0 ?
154 keys.getFirst()[i] : startRow;
155 byte[] splitStop = (stopRow.length == 0 ||
156 Bytes.compareTo(keys.getSecond()[i], stopRow) <= 0) &&
157 keys.getSecond()[i].length > 0 ?
158 keys.getSecond()[i] : stopRow;
159 InputSplit split = new TableSplit(table.getTableName(),
160 splitStart, splitStop, regionLocation);
161 splits.add(split);
162 if (LOG.isDebugEnabled())
163 LOG.debug("getSplits: split -> " + (count++) + " -> " + split);
164 }
165 }
166 return splits;
167 }
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191 protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) {
192 return true;
193 }
194
195
196
197
198 protected HTable getHTable() {
199 return this.table;
200 }
201
202
203
204
205
206
207 protected void setHTable(HTable table) {
208 this.table = table;
209 }
210
211
212
213
214
215
216 public Scan getScan() {
217 if (this.scan == null) this.scan = new Scan();
218 return scan;
219 }
220
221
222
223
224
225
226 public void setScan(Scan scan) {
227 this.scan = scan;
228 }
229
230
231
232
233
234
235
236 protected void setTableRecordReader(TableRecordReader tableRecordReader) {
237 this.tableRecordReader = tableRecordReader;
238 }
239 }