1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.util.Map;
25 import java.util.NavigableMap;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileUtil;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.hbase.HColumnDescriptor;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.HTableDescriptor;
35 import org.apache.hadoop.hbase.KeyValue;
36 import org.apache.hadoop.hbase.MultiRegionTable;
37 import org.apache.hadoop.hbase.client.HTable;
38 import org.apache.hadoop.hbase.client.Put;
39 import org.apache.hadoop.hbase.client.Result;
40 import org.apache.hadoop.hbase.client.ResultScanner;
41 import org.apache.hadoop.hbase.client.Scan;
42 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
43 import org.apache.hadoop.hbase.util.Bytes;
44 import org.apache.hadoop.mapred.MiniMRCluster;
45 import org.apache.hadoop.mapreduce.Job;
46 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
47
48
49
50
51
52
53 public class TestTableMapReduce extends MultiRegionTable {
54 private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
55 static final String MULTI_REGION_TABLE_NAME = "mrtest";
56 static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
57 static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
58
59
60 public TestTableMapReduce() {
61 super(Bytes.toString(INPUT_FAMILY));
62 desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
63 desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
64 desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
65 }
66
67
68
69
70 public static class ProcessContentsMapper
71 extends TableMapper<ImmutableBytesWritable, Put> {
72
73
74
75
76
77
78
79
80
81 public void map(ImmutableBytesWritable key, Result value,
82 Context context)
83 throws IOException, InterruptedException {
84 if (value.size() != 1) {
85 throw new IOException("There should only be one input column");
86 }
87 Map<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>
88 cf = value.getMap();
89 if(!cf.containsKey(INPUT_FAMILY)) {
90 throw new IOException("Wrong input columns. Missing: '" +
91 Bytes.toString(INPUT_FAMILY) + "'.");
92 }
93
94
95 String originalValue = new String(value.getValue(INPUT_FAMILY, null),
96 HConstants.UTF8_ENCODING);
97 StringBuilder newValue = new StringBuilder(originalValue);
98 newValue.reverse();
99
100 Put outval = new Put(key.get());
101 outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
102 context.write(key, outval);
103 }
104 }
105
106
107
108
109
110
111
112 public void testMultiRegionTable()
113 throws IOException, InterruptedException, ClassNotFoundException {
114 runTestOnTable(new HTable(new Configuration(conf), MULTI_REGION_TABLE_NAME));
115 }
116
117 private void runTestOnTable(HTable table)
118 throws IOException, InterruptedException, ClassNotFoundException {
119 MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
120 Job job = null;
121 try {
122 LOG.info("Before map/reduce startup");
123 job = new Job(table.getConfiguration(), "process column contents");
124 job.setNumReduceTasks(1);
125 Scan scan = new Scan();
126 scan.addFamily(INPUT_FAMILY);
127 TableMapReduceUtil.initTableMapperJob(
128 Bytes.toString(table.getTableName()), scan,
129 ProcessContentsMapper.class, ImmutableBytesWritable.class,
130 Put.class, job);
131 TableMapReduceUtil.initTableReducerJob(
132 Bytes.toString(table.getTableName()),
133 IdentityTableReducer.class, job);
134 FileOutputFormat.setOutputPath(job, new Path("test"));
135 LOG.info("Started " + Bytes.toString(table.getTableName()));
136 job.waitForCompletion(true);
137 LOG.info("After map/reduce completion");
138
139
140 verify(Bytes.toString(table.getTableName()));
141 } finally {
142 mrCluster.shutdown();
143 if (job != null) {
144 FileUtil.fullyDelete(
145 new File(job.getConfiguration().get("hadoop.tmp.dir")));
146 }
147 }
148 }
149
150 private void verify(String tableName) throws IOException {
151 HTable table = new HTable(new Configuration(conf), tableName);
152 boolean verified = false;
153 long pause = conf.getLong("hbase.client.pause", 5 * 1000);
154 int numRetries = conf.getInt("hbase.client.retries.number", 5);
155 for (int i = 0; i < numRetries; i++) {
156 try {
157 LOG.info("Verification attempt #" + i);
158 verifyAttempt(table);
159 verified = true;
160 break;
161 } catch (NullPointerException e) {
162
163
164 LOG.debug("Verification attempt failed: " + e.getMessage());
165 }
166 try {
167 Thread.sleep(pause);
168 } catch (InterruptedException e) {
169
170 }
171 }
172 assertTrue(verified);
173 }
174
175
176
177
178
179
180
181
182
183 private void verifyAttempt(final HTable table) throws IOException, NullPointerException {
184 Scan scan = new Scan();
185 scan.addFamily(INPUT_FAMILY);
186 scan.addFamily(OUTPUT_FAMILY);
187 ResultScanner scanner = table.getScanner(scan);
188 try {
189 for (Result r : scanner) {
190 if (LOG.isDebugEnabled()) {
191 if (r.size() > 2 ) {
192 throw new IOException("Too many results, expected 2 got " +
193 r.size());
194 }
195 }
196 byte[] firstValue = null;
197 byte[] secondValue = null;
198 int count = 0;
199 for(KeyValue kv : r.list()) {
200 if (count == 0) {
201 firstValue = kv.getValue();
202 }
203 if (count == 1) {
204 secondValue = kv.getValue();
205 }
206 count++;
207 if (count == 2) {
208 break;
209 }
210 }
211
212 String first = "";
213 if (firstValue == null) {
214 throw new NullPointerException(Bytes.toString(r.getRow()) +
215 ": first value is null");
216 }
217 first = new String(firstValue, HConstants.UTF8_ENCODING);
218
219 String second = "";
220 if (secondValue == null) {
221 throw new NullPointerException(Bytes.toString(r.getRow()) +
222 ": second value is null");
223 }
224 byte[] secondReversed = new byte[secondValue.length];
225 for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
226 secondReversed[i] = secondValue[j];
227 }
228 second = new String(secondReversed, HConstants.UTF8_ENCODING);
229
230 if (first.compareTo(second) != 0) {
231 if (LOG.isDebugEnabled()) {
232 LOG.debug("second key is not the reverse of first. row=" +
233 Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
234 ", second value=" + second);
235 }
236 fail();
237 }
238 }
239 } finally {
240 scanner.close();
241 }
242 }
243
244
245
246
247 public void testAddDependencyJars() throws Exception {
248 Job job = new Job();
249 TableMapReduceUtil.addDependencyJars(job);
250 String tmpjars = job.getConfiguration().get("tmpjars");
251
252 System.err.println("tmpjars: " + tmpjars);
253 assertTrue(tmpjars.contains("zookeeper"));
254 assertFalse(tmpjars.contains("guava"));
255
256 System.err.println("appending guava jar");
257 TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
258 com.google.common.base.Function.class);
259 tmpjars = job.getConfiguration().get("tmpjars");
260 assertTrue(tmpjars.contains("guava"));
261 }
262 }