1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapred;
21
22 import java.io.IOException;
23 import java.io.UnsupportedEncodingException;
24 import java.util.ArrayList;
25 import java.util.Map;
26
27 import org.apache.hadoop.hbase.HConstants;
28 import org.apache.hadoop.hbase.KeyValue;
29 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30 import org.apache.hadoop.hbase.client.Result;
31 import org.apache.hadoop.hbase.util.Bytes;
32 import org.apache.hadoop.mapred.JobConf;
33 import org.apache.hadoop.mapred.MapReduceBase;
34 import org.apache.hadoop.mapred.OutputCollector;
35 import org.apache.hadoop.mapred.Reporter;
36
37
38
39
40
41 @Deprecated
42 public class GroupingTableMap
43 extends MapReduceBase
44 implements TableMap<ImmutableBytesWritable,Result> {
45
46
47
48
49
50 public static final String GROUP_COLUMNS =
51 "hbase.mapred.groupingtablemap.columns";
52
53 protected byte [][] columns;
54
55
56
57
58
59
60
61
62
63
64
65
66 @SuppressWarnings("unchecked")
67 public static void initJob(String table, String columns, String groupColumns,
68 Class<? extends TableMap> mapper, JobConf job) {
69
70 TableMapReduceUtil.initTableMapJob(table, columns, mapper,
71 ImmutableBytesWritable.class, Result.class, job);
72 job.set(GROUP_COLUMNS, groupColumns);
73 }
74
75 @Override
76 public void configure(JobConf job) {
77 super.configure(job);
78 String[] cols = job.get(GROUP_COLUMNS, "").split(" ");
79 columns = new byte[cols.length][];
80 for(int i = 0; i < cols.length; i++) {
81 columns[i] = Bytes.toBytes(cols[i]);
82 }
83 }
84
85
86
87
88
89
90
91
92
93
94
95
96 public void map(ImmutableBytesWritable key, Result value,
97 OutputCollector<ImmutableBytesWritable,Result> output,
98 Reporter reporter) throws IOException {
99
100 byte[][] keyVals = extractKeyValues(value);
101 if(keyVals != null) {
102 ImmutableBytesWritable tKey = createGroupKey(keyVals);
103 output.collect(tKey, value);
104 }
105 }
106
107
108
109
110
111
112
113
114
115
116 protected byte[][] extractKeyValues(Result r) {
117 byte[][] keyVals = null;
118 ArrayList<byte[]> foundList = new ArrayList<byte[]>();
119 int numCols = columns.length;
120 if (numCols > 0) {
121 for (KeyValue value: r.list()) {
122 byte [] column = KeyValue.makeColumn(value.getFamily(),
123 value.getQualifier());
124 for (int i = 0; i < numCols; i++) {
125 if (Bytes.equals(column, columns[i])) {
126 foundList.add(value.getValue());
127 break;
128 }
129 }
130 }
131 if(foundList.size() == numCols) {
132 keyVals = foundList.toArray(new byte[numCols][]);
133 }
134 }
135 return keyVals;
136 }
137
138
139
140
141
142
143
144
145 protected ImmutableBytesWritable createGroupKey(byte[][] vals) {
146 if(vals == null) {
147 return null;
148 }
149 StringBuilder sb = new StringBuilder();
150 for(int i = 0; i < vals.length; i++) {
151 if(i > 0) {
152 sb.append(" ");
153 }
154 try {
155 sb.append(new String(vals[i], HConstants.UTF8_ENCODING));
156 } catch (UnsupportedEncodingException e) {
157 throw new RuntimeException(e);
158 }
159 }
160 return new ImmutableBytesWritable(Bytes.toBytes(sb.toString()));
161 }
162 }