1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapred;
21
22 import java.io.IOException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.UnknownScannerException;
27 import org.apache.hadoop.hbase.client.HTable;
28 import org.apache.hadoop.hbase.client.Result;
29 import org.apache.hadoop.hbase.client.ResultScanner;
30 import org.apache.hadoop.hbase.client.Scan;
31 import org.apache.hadoop.hbase.filter.Filter;
32 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.apache.hadoop.hbase.util.Writables;
35
36 import org.apache.hadoop.util.StringUtils;
37
38
39
40
41
42 public class TableRecordReaderImpl {
43 static final Log LOG = LogFactory.getLog(TableRecordReaderImpl.class);
44
45 private byte [] startRow;
46 private byte [] endRow;
47 private byte [] lastRow;
48 private Filter trrRowFilter;
49 private ResultScanner scanner;
50 private HTable htable;
51 private byte [][] trrInputColumns;
52
53
54
55
56
57
58
59 public void restart(byte[] firstRow) throws IOException {
60 if ((endRow != null) && (endRow.length > 0)) {
61 if (trrRowFilter != null) {
62 Scan scan = new Scan(firstRow, endRow);
63 scan.addColumns(trrInputColumns);
64 scan.setFilter(trrRowFilter);
65 scan.setCacheBlocks(false);
66 this.scanner = this.htable.getScanner(scan);
67 } else {
68 LOG.debug("TIFB.restart, firstRow: " +
69 Bytes.toStringBinary(firstRow) + ", endRow: " +
70 Bytes.toStringBinary(endRow));
71 Scan scan = new Scan(firstRow, endRow);
72 scan.addColumns(trrInputColumns);
73 this.scanner = this.htable.getScanner(scan);
74 }
75 } else {
76 LOG.debug("TIFB.restart, firstRow: " +
77 Bytes.toStringBinary(firstRow) + ", no endRow");
78
79 Scan scan = new Scan(firstRow);
80 scan.addColumns(trrInputColumns);
81
82 this.scanner = this.htable.getScanner(scan);
83 }
84 }
85
86
87
88
89
90
91 public void init() throws IOException {
92 restart(startRow);
93 }
94
95 byte[] getStartRow() {
96 return this.startRow;
97 }
98
99
100
101 public void setHTable(HTable htable) {
102 this.htable = htable;
103 }
104
105
106
107
108 public void setInputColumns(final byte [][] inputColumns) {
109 this.trrInputColumns = inputColumns;
110 }
111
112
113
114
115 public void setStartRow(final byte [] startRow) {
116 this.startRow = startRow;
117 }
118
119
120
121
122
123 public void setEndRow(final byte [] endRow) {
124 this.endRow = endRow;
125 }
126
127
128
129
130 public void setRowFilter(Filter rowFilter) {
131 this.trrRowFilter = rowFilter;
132 }
133
134 public void close() {
135 this.scanner.close();
136 }
137
138
139
140
141
142
143 public ImmutableBytesWritable createKey() {
144 return new ImmutableBytesWritable();
145 }
146
147
148
149
150
151
152 public Result createValue() {
153 return new Result();
154 }
155
156 public long getPos() {
157
158
159 return 0;
160 }
161
162 public float getProgress() {
163
164 return 0;
165 }
166
167
168
169
170
171
172
173 public boolean next(ImmutableBytesWritable key, Result value)
174 throws IOException {
175 Result result;
176 try {
177 result = this.scanner.next();
178 } catch (UnknownScannerException e) {
179 LOG.debug("recovered from " + StringUtils.stringifyException(e));
180 restart(lastRow);
181 this.scanner.next();
182 result = this.scanner.next();
183 }
184
185 if (result != null && result.size() > 0) {
186 key.set(result.getRow());
187 lastRow = key.get();
188 Writables.copyWritable(result, value);
189 return true;
190 }
191 return false;
192 }
193 }