1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.util.List;
23 import java.util.TreeSet;
24
25 import org.apache.hadoop.hbase.KeyValue;
26 import org.apache.hadoop.hbase.client.Put;
27 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
28 import org.apache.hadoop.mapreduce.Reducer;
29
30
31
32
33
34
35
36
37
38 public class PutSortReducer extends
39 Reducer<ImmutableBytesWritable, Put, ImmutableBytesWritable, KeyValue> {
40
41 @Override
42 protected void reduce(
43 ImmutableBytesWritable row,
44 java.lang.Iterable<Put> puts,
45 Reducer<ImmutableBytesWritable, Put,
46 ImmutableBytesWritable, KeyValue>.Context context)
47 throws java.io.IOException, InterruptedException
48 {
49 TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
50
51 for (Put p : puts) {
52 for (List<KeyValue> kvs : p.getFamilyMap().values()) {
53 for (KeyValue kv : kvs) {
54 map.add(kv.clone());
55 }
56 }
57 }
58 context.setStatus("Read " + map.getClass());
59 int index = 0;
60 for (KeyValue kv : map) {
61 context.write(row, kv);
62 if (index > 0 && index % 100 == 0)
63 context.setStatus("Wrote " + index);
64 }
65 }
66 }