1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.IOException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.client.HTable;
27 import org.apache.hadoop.hbase.client.Result;
28 import org.apache.hadoop.hbase.client.ResultScanner;
29 import org.apache.hadoop.hbase.client.Scan;
30 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31 import org.apache.hadoop.util.StringUtils;
32
33
34
35
36
37 public class TableRecordReaderImpl {
38
39
40 static final Log LOG = LogFactory.getLog(TableRecordReader.class);
41
42 private ResultScanner scanner = null;
43 private Scan scan = null;
44 private HTable htable = null;
45 private byte[] lastRow = null;
46 private ImmutableBytesWritable key = null;
47 private Result value = null;
48
49
50
51
52
53
54
55 public void restart(byte[] firstRow) throws IOException {
56 Scan newScan = new Scan(scan);
57 newScan.setStartRow(firstRow);
58 this.scanner = this.htable.getScanner(newScan);
59 }
60
61
62
63
64
65
66 public void init() throws IOException {
67 restart(scan.getStartRow());
68 }
69
70
71
72
73
74
75 public void setHTable(HTable htable) {
76 this.htable = htable;
77 }
78
79
80
81
82
83
84 public void setScan(Scan scan) {
85 this.scan = scan;
86 }
87
88
89
90
91
92
93 public void close() {
94 this.scanner.close();
95 }
96
97
98
99
100
101
102
103
104 public ImmutableBytesWritable getCurrentKey() throws IOException,
105 InterruptedException {
106 return key;
107 }
108
109
110
111
112
113
114
115
116 public Result getCurrentValue() throws IOException, InterruptedException {
117 return value;
118 }
119
120
121
122
123
124
125
126
127
128 public boolean nextKeyValue() throws IOException, InterruptedException {
129 if (key == null) key = new ImmutableBytesWritable();
130 if (value == null) value = new Result();
131 try {
132 value = this.scanner.next();
133 } catch (IOException e) {
134 LOG.debug("recovered from " + StringUtils.stringifyException(e));
135 if (lastRow == null) {
136 LOG.warn("We are restarting the first next() invocation," +
137 " if your mapper's restarted a few other times like this" +
138 " then you should consider killing this job and investigate" +
139 " why it's taking so long.");
140 lastRow = scan.getStartRow();
141 }
142 restart(lastRow);
143 scanner.next();
144 value = scanner.next();
145 }
146 if (value != null && value.size() > 0) {
147 key.set(value.getRow());
148 lastRow = key.get();
149 return true;
150 }
151 return false;
152 }
153
154
155
156
157
158
159 public float getProgress() {
160
161 return 0;
162 }
163
164 }