1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapred;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.util.Map;
25 import java.util.NavigableMap;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.fs.FileUtil;
30 import org.apache.hadoop.hbase.*;
31 import org.apache.hadoop.hbase.client.HTable;
32 import org.apache.hadoop.hbase.client.Result;
33 import org.apache.hadoop.hbase.client.Scan;
34 import org.apache.hadoop.hbase.client.ResultScanner;
35 import org.apache.hadoop.hbase.client.Put;
36 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37 import org.apache.hadoop.hbase.client.Result;
38 import org.apache.hadoop.hbase.util.Bytes;
39 import org.apache.hadoop.mapred.JobClient;
40 import org.apache.hadoop.mapred.JobConf;
41 import org.apache.hadoop.mapred.MapReduceBase;
42 import org.apache.hadoop.mapred.MiniMRCluster;
43 import org.apache.hadoop.mapred.OutputCollector;
44 import org.apache.hadoop.mapred.Reporter;
45
46
47
48
49
50
51 public class TestTableMapReduce extends MultiRegionTable {
52 private static final Log LOG =
53 LogFactory.getLog(TestTableMapReduce.class.getName());
54
55 static final String MULTI_REGION_TABLE_NAME = "mrtest";
56 static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
57 static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
58
59 private static final byte [][] columns = new byte [][] {
60 INPUT_FAMILY,
61 OUTPUT_FAMILY
62 };
63
64
65 public TestTableMapReduce() {
66 super(Bytes.toString(INPUT_FAMILY));
67 desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
68 desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
69 desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
70 }
71
72
73
74
75 public static class ProcessContentsMapper
76 extends MapReduceBase
77 implements TableMap<ImmutableBytesWritable, Put> {
78
79
80
81
82
83
84
85
86 public void map(ImmutableBytesWritable key, Result value,
87 OutputCollector<ImmutableBytesWritable, Put> output,
88 Reporter reporter)
89 throws IOException {
90 if (value.size() != 1) {
91 throw new IOException("There should only be one input column");
92 }
93 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
94 cf = value.getMap();
95 if(!cf.containsKey(INPUT_FAMILY)) {
96 throw new IOException("Wrong input columns. Missing: '" +
97 Bytes.toString(INPUT_FAMILY) + "'.");
98 }
99
100
101
102 String originalValue = new String(value.getValue(INPUT_FAMILY, null),
103 HConstants.UTF8_ENCODING);
104 StringBuilder newValue = new StringBuilder(originalValue);
105 newValue.reverse();
106
107
108
109 Put outval = new Put(key.get());
110 outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
111 output.collect(key, outval);
112 }
113 }
114
115
116
117
118
119 public void testMultiRegionTable() throws IOException {
120 runTestOnTable(new HTable(conf, MULTI_REGION_TABLE_NAME));
121 }
122
123 private void runTestOnTable(HTable table) throws IOException {
124 MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
125
126 JobConf jobConf = null;
127 try {
128 LOG.info("Before map/reduce startup");
129 jobConf = new JobConf(conf, TestTableMapReduce.class);
130 jobConf.setJobName("process column contents");
131 jobConf.setNumReduceTasks(1);
132 TableMapReduceUtil.initTableMapJob(Bytes.toString(table.getTableName()),
133 Bytes.toString(INPUT_FAMILY), ProcessContentsMapper.class,
134 ImmutableBytesWritable.class, Put.class, jobConf);
135 TableMapReduceUtil.initTableReduceJob(Bytes.toString(table.getTableName()),
136 IdentityTableReduce.class, jobConf);
137
138 LOG.info("Started " + Bytes.toString(table.getTableName()));
139 JobClient.runJob(jobConf);
140 LOG.info("After map/reduce completion");
141
142
143 verify(Bytes.toString(table.getTableName()));
144 } finally {
145 mrCluster.shutdown();
146 if (jobConf != null) {
147 FileUtil.fullyDelete(new File(jobConf.get("hadoop.tmp.dir")));
148 }
149 }
150 }
151
152 private void verify(String tableName) throws IOException {
153 HTable table = new HTable(conf, tableName);
154 boolean verified = false;
155 long pause = conf.getLong("hbase.client.pause", 5 * 1000);
156 int numRetries = conf.getInt("hbase.client.retries.number", 5);
157 for (int i = 0; i < numRetries; i++) {
158 try {
159 LOG.info("Verification attempt #" + i);
160 verifyAttempt(table);
161 verified = true;
162 break;
163 } catch (NullPointerException e) {
164
165
166 LOG.debug("Verification attempt failed: " + e.getMessage());
167 }
168 try {
169 Thread.sleep(pause);
170 } catch (InterruptedException e) {
171
172 }
173 }
174 assertTrue(verified);
175 }
176
177
178
179
180
181
182
183
184 private void verifyAttempt(final HTable table) throws IOException, NullPointerException {
185 Scan scan = new Scan();
186 scan.addColumns(columns);
187 ResultScanner scanner = table.getScanner(scan);
188 try {
189 for (Result r : scanner) {
190 if (LOG.isDebugEnabled()) {
191 if (r.size() > 2 ) {
192 throw new IOException("Too many results, expected 2 got " +
193 r.size());
194 }
195 }
196 byte[] firstValue = null;
197 byte[] secondValue = null;
198 int count = 0;
199 for(KeyValue kv : r.list()) {
200 if (count == 0) {
201 firstValue = kv.getValue();
202 }
203 if (count == 1) {
204 secondValue = kv.getValue();
205 }
206 count++;
207 if (count == 2) {
208 break;
209 }
210 }
211
212
213 String first = "";
214 if (firstValue == null) {
215 throw new NullPointerException(Bytes.toString(r.getRow()) +
216 ": first value is null");
217 }
218 first = new String(firstValue, HConstants.UTF8_ENCODING);
219
220 String second = "";
221 if (secondValue == null) {
222 throw new NullPointerException(Bytes.toString(r.getRow()) +
223 ": second value is null");
224 }
225 byte[] secondReversed = new byte[secondValue.length];
226 for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
227 secondReversed[i] = secondValue[j];
228 }
229 second = new String(secondReversed, HConstants.UTF8_ENCODING);
230
231 if (first.compareTo(second) != 0) {
232 if (LOG.isDebugEnabled()) {
233 LOG.debug("second key is not the reverse of first. row=" +
234 r.getRow() + ", first value=" + first + ", second value=" +
235 second);
236 }
237 fail();
238 }
239 }
240 } finally {
241 scanner.close();
242 }
243 }
244 }