1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce;
21
22 import java.io.IOException;
23
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.hbase.HBaseConfiguration;
26 import org.apache.hadoop.hbase.KeyValue;
27 import org.apache.hadoop.hbase.client.Result;
28 import org.apache.hadoop.hbase.client.Scan;
29 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
30 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31 import org.apache.hadoop.hbase.util.Bytes;
32 import org.apache.hadoop.mapreduce.Job;
33 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
34 import org.apache.hadoop.util.GenericOptionsParser;
35
36
37
38
39
40 public class RowCounter {
41
42
43 static final String NAME = "rowcounter";
44
45
46
47
48 static class RowCounterMapper
49 extends TableMapper<ImmutableBytesWritable, Result> {
50
51
52 public static enum Counters {ROWS}
53
54
55
56
57
58
59
60
61
62
63
64 @Override
65 public void map(ImmutableBytesWritable row, Result values,
66 Context context)
67 throws IOException {
68 for (KeyValue value: values.list()) {
69 if (value.getValue().length > 0) {
70 context.getCounter(Counters.ROWS).increment(1);
71 break;
72 }
73 }
74 }
75 }
76
77
78
79
80
81
82
83
84
85 public static Job createSubmittableJob(Configuration conf, String[] args)
86 throws IOException {
87 String tableName = args[0];
88 Job job = new Job(conf, NAME + "_" + tableName);
89 job.setJarByClass(RowCounter.class);
90
91 StringBuilder sb = new StringBuilder();
92 final int columnoffset = 1;
93 for (int i = columnoffset; i < args.length; i++) {
94 if (i > columnoffset) {
95 sb.append(" ");
96 }
97 sb.append(args[i]);
98 }
99 Scan scan = new Scan();
100 scan.setFilter(new FirstKeyOnlyFilter());
101 if (sb.length() > 0) {
102 for (String columnName :sb.toString().split(" ")) {
103 String [] fields = columnName.split(":");
104 if(fields.length == 1) {
105 scan.addFamily(Bytes.toBytes(fields[0]));
106 } else {
107 scan.addColumn(Bytes.toBytes(fields[0]), Bytes.toBytes(fields[1]));
108 }
109 }
110 }
111
112 job.setOutputFormatClass(NullOutputFormat.class);
113 TableMapReduceUtil.initTableMapperJob(tableName, scan,
114 RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, job);
115 job.setNumReduceTasks(0);
116 return job;
117 }
118
119
120
121
122
123
124
125 public static void main(String[] args) throws Exception {
126 Configuration conf = HBaseConfiguration.create();
127 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
128 if (otherArgs.length < 1) {
129 System.err.println("ERROR: Wrong number of parameters: " + args.length);
130 System.err.println("Usage: RowCounter <tablename> [<column1> <column2>...]");
131 System.exit(-1);
132 }
133 Job job = createSubmittableJob(conf, otherArgs);
134 System.exit(job.waitForCompletion(true) ? 0 : 1);
135 }
136 }