1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.mapreduce.replication;
21
22 import java.io.IOException;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.conf.Configuration;
27 import org.apache.hadoop.hbase.HBaseConfiguration;
28 import org.apache.hadoop.hbase.HConstants;
29 import org.apache.hadoop.hbase.KeyValue;
30 import org.apache.hadoop.hbase.client.HConnection;
31 import org.apache.hadoop.hbase.client.HConnectionManager;
32 import org.apache.hadoop.hbase.client.HTable;
33 import org.apache.hadoop.hbase.client.Put;
34 import org.apache.hadoop.hbase.client.Result;
35 import org.apache.hadoop.hbase.client.ResultScanner;
36 import org.apache.hadoop.hbase.client.Scan;
37 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
38 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
39 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
40 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
41 import org.apache.hadoop.hbase.mapreduce.TableMapper;
42 import org.apache.hadoop.hbase.replication.ReplicationPeer;
43 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
44 import org.apache.hadoop.hbase.util.Bytes;
45 import org.apache.hadoop.mapreduce.Job;
46 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
47 import org.apache.zookeeper.KeeperException;
48
49
50
51
52
53
54
55
56
57
58
59 public class VerifyReplication {
60
61 private static final Log LOG =
62 LogFactory.getLog(VerifyReplication.class);
63
64 public final static String NAME = "verifyrep";
65 static long startTime = 0;
66 static long endTime = 0;
67 static String tableName = null;
68 static String families = null;
69 static String peerId = null;
70
71
72
73
74 public static class Verifier
75 extends TableMapper<ImmutableBytesWritable, Put> {
76
77 public static enum Counters {GOODROWS, BADROWS}
78
79 private ResultScanner replicatedScanner;
80
81
82
83
84
85
86
87
88
89 @Override
90 public void map(ImmutableBytesWritable row, Result value,
91 Context context)
92 throws IOException {
93 if (replicatedScanner == null) {
94 Configuration conf = context.getConfiguration();
95 Scan scan = new Scan();
96 scan.setCaching(conf.getInt(TableInputFormat.SCAN_CACHEDROWS, 1));
97 long startTime = conf.getLong(NAME + ".startTime", 0);
98 long endTime = conf.getLong(NAME + ".endTime", 0);
99 String families = conf.get(NAME + ".families", null);
100 if(families != null) {
101 String[] fams = families.split(",");
102 for(String fam : fams) {
103 scan.addFamily(Bytes.toBytes(fam));
104 }
105 }
106 if (startTime != 0) {
107 scan.setTimeRange(startTime,
108 endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
109 }
110 try {
111 HConnection conn = HConnectionManager.getConnection(conf);
112 ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
113 conn.getZooKeeperWatcher());
114 ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId"));
115 HTable replicatedTable = new HTable(peer.getConfiguration(),
116 conf.get(NAME+".tableName"));
117 scan.setStartRow(value.getRow());
118 replicatedScanner = replicatedTable.getScanner(scan);
119 } catch (KeeperException e) {
120 throw new IOException("Got a ZK exception", e);
121 }
122 }
123 Result res = replicatedScanner.next();
124 try {
125 Result.compareResults(value, res);
126 context.getCounter(Counters.GOODROWS).increment(1);
127 } catch (Exception e) {
128 LOG.warn("Bad row", e);
129 context.getCounter(Counters.BADROWS).increment(1);
130 }
131 }
132
133 protected void cleanup(Context context) {
134 replicatedScanner.close();
135 }
136 }
137
138
139
140
141
142
143
144
145
146 public static Job createSubmittableJob(Configuration conf, String[] args)
147 throws IOException {
148 if (!doCommandLine(args)) {
149 return null;
150 }
151 if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
152 throw new IOException("Replication needs to be enabled to verify it.");
153 }
154 try {
155 HConnection conn = HConnectionManager.getConnection(conf);
156 ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
157 conn.getZooKeeperWatcher());
158
159 ReplicationPeer peer = zk.getPeer(peerId);
160 if (peer == null) {
161 throw new IOException("Couldn't get access to the slave cluster," +
162 "please see the log");
163 }
164 } catch (KeeperException ex) {
165 throw new IOException("Couldn't get access to the slave cluster" +
166 " because: ", ex);
167 }
168 conf.set(NAME+".peerId", peerId);
169 conf.set(NAME+".tableName", tableName);
170 conf.setLong(NAME+".startTime", startTime);
171 conf.setLong(NAME+".endTime", endTime);
172 if (families != null) {
173 conf.set(NAME+".families", families);
174 }
175 Job job = new Job(conf, NAME + "_" + tableName);
176 job.setJarByClass(VerifyReplication.class);
177
178 Scan scan = new Scan();
179 if (startTime != 0) {
180 scan.setTimeRange(startTime,
181 endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
182 }
183 if(families != null) {
184 String[] fams = families.split(",");
185 for(String fam : fams) {
186 scan.addFamily(Bytes.toBytes(fam));
187 }
188 }
189 TableMapReduceUtil.initTableMapperJob(tableName, scan,
190 Verifier.class, null, null, job);
191 job.setOutputFormatClass(NullOutputFormat.class);
192 job.setNumReduceTasks(0);
193 return job;
194 }
195
196 private static boolean doCommandLine(final String[] args) {
197 if (args.length < 2) {
198 printUsage(null);
199 return false;
200 }
201 try {
202 for (int i = 0; i < args.length; i++) {
203 String cmd = args[i];
204 if (cmd.equals("-h") || cmd.startsWith("--h")) {
205 printUsage(null);
206 return false;
207 }
208
209 final String startTimeArgKey = "--starttime=";
210 if (cmd.startsWith(startTimeArgKey)) {
211 startTime = Long.parseLong(cmd.substring(startTimeArgKey.length()));
212 continue;
213 }
214
215 final String endTimeArgKey = "--endtime=";
216 if (cmd.startsWith(endTimeArgKey)) {
217 endTime = Long.parseLong(cmd.substring(endTimeArgKey.length()));
218 continue;
219 }
220
221 final String familiesArgKey = "--families=";
222 if (cmd.startsWith(familiesArgKey)) {
223 families = cmd.substring(familiesArgKey.length());
224 continue;
225 }
226
227 if (i == args.length-2) {
228 peerId = cmd;
229 }
230
231 if (i == args.length-1) {
232 tableName = cmd;
233 }
234 }
235 } catch (Exception e) {
236 e.printStackTrace();
237 printUsage("Can't start because " + e.getMessage());
238 return false;
239 }
240 return true;
241 }
242
243
244
245
246 private static void printUsage(final String errorMsg) {
247 if (errorMsg != null && errorMsg.length() > 0) {
248 System.err.println("ERROR: " + errorMsg);
249 }
250 System.err.println("Usage: verifyrep [--starttime=X]" +
251 " [--stoptime=Y] [--families=A] <peerid> <tablename>");
252 System.err.println();
253 System.err.println("Options:");
254 System.err.println(" starttime beginning of the time range");
255 System.err.println(" without endtime means from starttime to forever");
256 System.err.println(" stoptime end of the time range");
257 System.err.println(" families comma-separated list of families to copy");
258 System.err.println();
259 System.err.println("Args:");
260 System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
261 System.err.println(" tablename Name of the table to verify");
262 System.err.println();
263 System.err.println("Examples:");
264 System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
265 System.err.println(" $ bin/hbase " +
266 "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
267 " --starttime=1265875194289 --stoptime=1265878794289 5 TestTable ");
268 }
269
270
271
272
273
274
275
276 public static void main(String[] args) throws Exception {
277 Configuration conf = HBaseConfiguration.create();
278 Job job = createSubmittableJob(conf, args);
279 if (job != null) {
280 System.exit(job.waitForCompletion(true) ? 0 : 1);
281 }
282 }
283 }