1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.IOException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.hbase.HBaseConfiguration;
29 import org.apache.hadoop.hbase.KeyValue;
30 import org.apache.hadoop.hbase.client.Result;
31 import org.apache.hadoop.hbase.client.Scan;
32 import org.apache.hadoop.hbase.filter.*;
33 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34 import org.apache.hadoop.hbase.util.Bytes;
35 import org.apache.hadoop.mapreduce.Job;
36 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
37 import org.apache.hadoop.util.GenericOptionsParser;
38 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
39 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
40 import org.apache.hadoop.fs.Path;
41 import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
42 import org.apache.hadoop.io.IntWritable;
43 import org.apache.hadoop.mapreduce.Reducer;
44 import org.apache.hadoop.io.Text;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 public class CellCounter {
65 private static final Log LOG =
66 LogFactory.getLog(CellCounter.class.getName());
67
68
69
70
71
72 static final String NAME = "CellCounter";
73
74
75
76
77 static class CellCounterMapper
78 extends TableMapper<Text, IntWritable> {
79
80
81
82 public static enum Counters {
83 ROWS
84 }
85
86
87
88
89
90
91
92
93
94
95
96
97 @Override
98 public void map(ImmutableBytesWritable row, Result values,
99 Context context)
100 throws IOException {
101 String currentFamilyName = null;
102 String currentQualifierName = null;
103 String currentRowKey = null;
104 Configuration config = context.getConfiguration();
105 String separator = config.get("ReportSeparator",":");
106
107 try {
108 if (values != null) {
109 context.getCounter(Counters.ROWS).increment(1);
110 context.write(new Text("Total ROWS"), new IntWritable(1));
111 }
112
113 for (KeyValue value : values.list()) {
114 currentRowKey = Bytes.toStringBinary(value.getRow());
115 String thisRowFamilyName = Bytes.toStringBinary(value.getFamily());
116 if (thisRowFamilyName != null &&
117 !thisRowFamilyName.equals(currentFamilyName)) {
118 currentFamilyName = thisRowFamilyName;
119 context.getCounter("CF", thisRowFamilyName).increment(1);
120 context.write(new Text("Total Families Across all Rows"),
121 new IntWritable(1));
122 context.write(new Text(thisRowFamilyName), new IntWritable(1));
123 }
124 String thisRowQualifierName =
125 thisRowFamilyName + separator + Bytes.toStringBinary(value.getQualifier());
126 if (thisRowQualifierName != null &&
127 !thisRowQualifierName.equals(currentQualifierName)) {
128 currentQualifierName = thisRowQualifierName;
129 context.getCounter("CFQL", thisRowQualifierName).increment(1);
130 context.write(new Text("Total Qualifiers across all Rows"),
131 new IntWritable(1));
132 context.write(new Text(thisRowQualifierName), new IntWritable(1));
133
134 context.getCounter("QL_VERSIONS", currentRowKey + separator +
135 thisRowQualifierName).increment(1);
136 context.write(new Text(currentRowKey + separator + thisRowQualifierName +
137 "_Versions"), new IntWritable(1));
138
139 } else {
140
141 currentQualifierName = thisRowQualifierName;
142 context.getCounter("QL_VERSIONS", currentRowKey + separator +
143 thisRowQualifierName).increment(1);
144 context.write(new Text(currentRowKey + separator + thisRowQualifierName + "_Versions"),
145 new IntWritable(1));
146 }
147 }
148 } catch (InterruptedException e) {
149 e.printStackTrace();
150 }
151 }
152 }
153
154 static class IntSumReducer<Key> extends Reducer<Key, IntWritable,
155 Key, IntWritable> {
156
157 private IntWritable result = new IntWritable();
158 public void reduce(Key key, Iterable<IntWritable> values,
159 Context context)
160 throws IOException, InterruptedException {
161 int sum = 0;
162 for (IntWritable val : values) {
163 sum += val.get();
164 }
165 result.set(sum);
166 context.write(key, result);
167 }
168 }
169
170
171
172
173
174
175
176
177
178 public static Job createSubmittableJob(Configuration conf, String[] args)
179 throws IOException {
180 String tableName = args[0];
181 Path outputDir = new Path(args[1]);
182 String reportSeparatorString = (args.length > 2) ? args[2]: ":";
183 conf.set("ReportSeparator", reportSeparatorString);
184 Job job = new Job(conf, NAME + "_" + tableName);
185 job.setJarByClass(CellCounter.class);
186 Scan scan = getConfiguredScanForJob(conf, args);
187 TableMapReduceUtil.initTableMapperJob(tableName, scan,
188 CellCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
189 job.setNumReduceTasks(1);
190 job.setMapOutputKeyClass(Text.class);
191 job.setMapOutputValueClass(IntWritable.class);
192 job.setOutputFormatClass(TextOutputFormat.class);
193 job.setOutputKeyClass(Text.class);
194 job.setOutputValueClass(IntWritable.class);
195 FileOutputFormat.setOutputPath(job, outputDir);
196 job.setReducerClass(IntSumReducer.class);
197 return job;
198 }
199
200 private static Scan getConfiguredScanForJob(Configuration conf, String[] args) throws IOException {
201 Scan s = new Scan();
202
203 s.setMaxVersions(Integer.MAX_VALUE);
204 s.setCacheBlocks(false);
205
206 if (conf.get(TableInputFormat.SCAN_COLUMN_FAMILY) != null) {
207 s.addFamily(Bytes.toBytes(conf.get(TableInputFormat.SCAN_COLUMN_FAMILY)));
208 }
209
210 Filter rowFilter = getRowFilter(args);
211 if (rowFilter!= null) {
212 LOG.info("Setting Row Filter for counter.");
213 s.setFilter(rowFilter);
214 }
215 return s;
216 }
217
218
219 private static Filter getRowFilter(String[] args) {
220 Filter rowFilter = null;
221 String filterCriteria = (args.length > 3) ? args[3]: null;
222 if (filterCriteria == null) return null;
223 if (filterCriteria.startsWith("^")) {
224 String regexPattern = filterCriteria.substring(1, filterCriteria.length());
225 rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexPattern));
226 } else {
227 rowFilter = new PrefixFilter(Bytes.toBytes(filterCriteria));
228 }
229 return rowFilter;
230 }
231
232
233
234
235
236
237
238 public static void main(String[] args) throws Exception {
239 Configuration conf = HBaseConfiguration.create();
240 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
241 if (otherArgs.length < 1) {
242 System.err.println("ERROR: Wrong number of parameters: " + args.length);
243 System.err.println("Usage: CellCounter <tablename> <outputDir> <reportSeparator> " +
244 "[^[regex pattern] or [Prefix] for row filter]] ");
245 System.err.println(" Note: -D properties will be applied to the conf used. ");
246 System.err.println(" Additionally, the following SCAN properties can be specified");
247 System.err.println(" to get fine grained control on what is counted..");
248 System.err.println(" -D " + TableInputFormat.SCAN_COLUMN_FAMILY + "=<familyName>");
249 System.err.println(" <reportSeparator> parameter can be used to override the default report separator " +
250 "string : used to separate the rowId/column family name and qualifier name.");
251 System.err.println(" [^[regex pattern] or [Prefix] parameter can be used to limit the cell counter count " +
252 "operation to a limited subset of rows from the table based on regex or prefix pattern.");
253 System.exit(-1);
254 }
255 Job job = createSubmittableJob(conf, otherArgs);
256 System.exit(job.waitForCompletion(true) ? 0 : 1);
257 }
258 }