1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication;
21
22 import static org.junit.Assert.assertArrayEquals;
23 import static org.junit.Assert.assertEquals;
24 import static org.junit.Assert.fail;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.hbase.HBaseConfiguration;
30 import org.apache.hadoop.hbase.HBaseTestingUtility;
31 import org.apache.hadoop.hbase.HColumnDescriptor;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.HTableDescriptor;
34 import org.apache.hadoop.hbase.KeyValue;
35 import org.apache.hadoop.hbase.UnknownScannerException;
36 import org.apache.hadoop.hbase.client.Delete;
37 import org.apache.hadoop.hbase.client.Get;
38 import org.apache.hadoop.hbase.client.HBaseAdmin;
39 import org.apache.hadoop.hbase.client.HTable;
40 import org.apache.hadoop.hbase.client.Put;
41 import org.apache.hadoop.hbase.client.Result;
42 import org.apache.hadoop.hbase.client.ResultScanner;
43 import org.apache.hadoop.hbase.client.Scan;
44 import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
45 import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
46 import org.apache.hadoop.hbase.util.Bytes;
47 import org.apache.hadoop.hbase.util.JVMClusterUtil;
48 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
49 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
50 import org.apache.hadoop.mapreduce.Job;
51 import org.junit.AfterClass;
52 import org.junit.Before;
53 import org.junit.BeforeClass;
54 import org.junit.Test;
55
56 public class TestReplication {
57
58 private static final Log LOG = LogFactory.getLog(TestReplication.class);
59
60 private static Configuration conf1;
61 private static Configuration conf2;
62
63 private static ZooKeeperWatcher zkw1;
64 private static ZooKeeperWatcher zkw2;
65
66 private static ReplicationAdmin admin;
67 private static String slaveClusterKey;
68
69 private static HTable htable1;
70 private static HTable htable2;
71
72 private static HBaseTestingUtility utility1;
73 private static HBaseTestingUtility utility2;
74 private static final int NB_ROWS_IN_BATCH = 100;
75 private static final int NB_ROWS_IN_BIG_BATCH =
76 NB_ROWS_IN_BATCH * 10;
77 private static final long SLEEP_TIME = 500;
78 private static final int NB_RETRIES = 10;
79
80 private static final byte[] tableName = Bytes.toBytes("test");
81 private static final byte[] famName = Bytes.toBytes("f");
82 private static final byte[] row = Bytes.toBytes("row");
83 private static final byte[] noRepfamName = Bytes.toBytes("norep");
84
85
86
87
88 @BeforeClass
89 public static void setUpBeforeClass() throws Exception {
90 conf1 = HBaseConfiguration.create();
91 conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
92
93
94 conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
95 conf1.setInt("replication.source.size.capacity", 1024);
96 conf1.setLong("replication.source.sleepforretries", 100);
97 conf1.setInt("hbase.regionserver.maxlogs", 10);
98 conf1.setLong("hbase.master.logcleaner.ttl", 10);
99 conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
100 conf1.setBoolean("dfs.support.append", true);
101 conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
102
103 utility1 = new HBaseTestingUtility(conf1);
104 utility1.startMiniZKCluster();
105 MiniZooKeeperCluster miniZK = utility1.getZkCluster();
106 zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null);
107 admin = new ReplicationAdmin(conf1);
108 LOG.info("Setup first Zk");
109
110 conf2 = HBaseConfiguration.create();
111 conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
112 conf2.setInt("hbase.client.retries.number", 6);
113 conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
114 conf2.setBoolean("dfs.support.append", true);
115
116 utility2 = new HBaseTestingUtility(conf2);
117 utility2.setZkCluster(miniZK);
118 zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null);
119
120 slaveClusterKey = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
121 conf2.get("hbase.zookeeper.property.clientPort")+":/2";
122 admin.addPeer("2", slaveClusterKey);
123 setIsReplication(true);
124
125 LOG.info("Setup second Zk");
126
127 utility1.startMiniCluster(2);
128 utility2.startMiniCluster(2);
129
130 HTableDescriptor table = new HTableDescriptor(tableName);
131 HColumnDescriptor fam = new HColumnDescriptor(famName);
132 fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
133 table.addFamily(fam);
134 fam = new HColumnDescriptor(noRepfamName);
135 table.addFamily(fam);
136 HBaseAdmin admin1 = new HBaseAdmin(conf1);
137 HBaseAdmin admin2 = new HBaseAdmin(conf2);
138 admin1.createTable(table);
139 admin2.createTable(table);
140
141 htable1 = new HTable(conf1, tableName);
142 htable1.setWriteBufferSize(1024);
143 htable2 = new HTable(conf2, tableName);
144 }
145
146 private static void setIsReplication(boolean rep) throws Exception {
147 LOG.info("Set rep " + rep);
148 admin.setReplicating(rep);
149 Thread.sleep(SLEEP_TIME);
150 }
151
152
153
154
155 @Before
156 public void setUp() throws Exception {
157
158
159
160 for ( JVMClusterUtil.RegionServerThread r :
161 utility1.getHBaseCluster().getRegionServerThreads()) {
162 r.getRegionServer().getWAL().rollWriter();
163 }
164 utility1.truncateTable(tableName);
165
166
167
168
169
170 Scan scan = new Scan();
171 int lastCount = 0;
172 for (int i = 0; i < NB_RETRIES; i++) {
173 if (i==NB_RETRIES-1) {
174 fail("Waited too much time for truncate");
175 }
176 ResultScanner scanner = htable2.getScanner(scan);
177 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
178 scanner.close();
179 if (res.length != 0) {
180 if (res.length < lastCount) {
181 i--;
182 }
183 lastCount = res.length;
184 LOG.info("Still got " + res.length + " rows");
185 Thread.sleep(SLEEP_TIME);
186 } else {
187 break;
188 }
189 }
190 }
191
192
193
194
195 @AfterClass
196 public static void tearDownAfterClass() throws Exception {
197 utility2.shutdownMiniCluster();
198 utility1.shutdownMiniCluster();
199 }
200
201
202
203
204
205 @Test
206 public void testSimplePutDelete() throws Exception {
207 LOG.info("testSimplePutDelete");
208 Put put = new Put(row);
209 put.add(famName, row, row);
210
211 htable1 = new HTable(conf1, tableName);
212 htable1.put(put);
213
214 Get get = new Get(row);
215 for (int i = 0; i < NB_RETRIES; i++) {
216 if (i==NB_RETRIES-1) {
217 fail("Waited too much time for put replication");
218 }
219 Result res = htable2.get(get);
220 if (res.size() == 0) {
221 LOG.info("Row not available");
222 Thread.sleep(SLEEP_TIME);
223 } else {
224 assertArrayEquals(res.value(), row);
225 break;
226 }
227 }
228
229 Delete del = new Delete(row);
230 htable1.delete(del);
231
232 get = new Get(row);
233 for (int i = 0; i < NB_RETRIES; i++) {
234 if (i==NB_RETRIES-1) {
235 fail("Waited too much time for del replication");
236 }
237 Result res = htable2.get(get);
238 if (res.size() >= 1) {
239 LOG.info("Row not deleted");
240 Thread.sleep(SLEEP_TIME);
241 } else {
242 break;
243 }
244 }
245 }
246
247
248
249
250
251 @Test
252 public void testSmallBatch() throws Exception {
253 LOG.info("testSmallBatch");
254 Put put;
255
256 htable1.setAutoFlush(false);
257 for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
258 put = new Put(Bytes.toBytes(i));
259 put.add(famName, row, row);
260 htable1.put(put);
261 }
262 htable1.flushCommits();
263
264 Scan scan = new Scan();
265
266 ResultScanner scanner1 = htable1.getScanner(scan);
267 Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
268 scanner1.close();
269 assertEquals(NB_ROWS_IN_BATCH, res1.length);
270
271 for (int i = 0; i < NB_RETRIES; i++) {
272 if (i==NB_RETRIES-1) {
273 fail("Waited too much time for normal batch replication");
274 }
275 ResultScanner scanner = htable2.getScanner(scan);
276 Result[] res = scanner.next(NB_ROWS_IN_BATCH);
277 scanner.close();
278 if (res.length != NB_ROWS_IN_BATCH) {
279 LOG.info("Only got " + res.length + " rows");
280 Thread.sleep(SLEEP_TIME);
281 } else {
282 break;
283 }
284 }
285
286 htable1.setAutoFlush(true);
287
288 }
289
290
291
292
293
294
295 @Test
296 public void testStartStop() throws Exception {
297
298
299 setIsReplication(false);
300
301 Put put = new Put(Bytes.toBytes("stop start"));
302 put.add(famName, row, row);
303 htable1.put(put);
304
305 Get get = new Get(Bytes.toBytes("stop start"));
306 for (int i = 0; i < NB_RETRIES; i++) {
307 if (i==NB_RETRIES-1) {
308 break;
309 }
310 Result res = htable2.get(get);
311 if(res.size() >= 1) {
312 fail("Replication wasn't stopped");
313
314 } else {
315 LOG.info("Row not replicated, let's wait a bit more...");
316 Thread.sleep(SLEEP_TIME);
317 }
318 }
319
320
321 setIsReplication(true);
322
323 htable1.put(put);
324
325 for (int i = 0; i < NB_RETRIES; i++) {
326 if (i==NB_RETRIES-1) {
327 fail("Waited too much time for put replication");
328 }
329 Result res = htable2.get(get);
330 if(res.size() == 0) {
331 LOG.info("Row not available");
332 Thread.sleep(SLEEP_TIME);
333 } else {
334 assertArrayEquals(res.value(), row);
335 break;
336 }
337 }
338
339 put = new Put(Bytes.toBytes("do not rep"));
340 put.add(noRepfamName, row, row);
341 htable1.put(put);
342
343 get = new Get(Bytes.toBytes("do not rep"));
344 for (int i = 0; i < NB_RETRIES; i++) {
345 if (i == NB_RETRIES-1) {
346 break;
347 }
348 Result res = htable2.get(get);
349 if (res.size() >= 1) {
350 fail("Not supposed to be replicated");
351 } else {
352 LOG.info("Row not replicated, let's wait a bit more...");
353 Thread.sleep(SLEEP_TIME);
354 }
355 }
356
357 }
358
359
360
361
362
363
364 @Test
365 public void testAddAndRemoveClusters() throws Exception {
366 LOG.info("testAddAndRemoveClusters");
367 admin.removePeer("2");
368 Thread.sleep(SLEEP_TIME);
369 byte[] rowKey = Bytes.toBytes("Won't be replicated");
370 Put put = new Put(rowKey);
371 put.add(famName, row, row);
372 htable1.put(put);
373
374 Get get = new Get(rowKey);
375 for (int i = 0; i < NB_RETRIES; i++) {
376 if (i == NB_RETRIES-1) {
377 break;
378 }
379 Result res = htable2.get(get);
380 if (res.size() >= 1) {
381 fail("Not supposed to be replicated");
382 } else {
383 LOG.info("Row not replicated, let's wait a bit more...");
384 Thread.sleep(SLEEP_TIME);
385 }
386 }
387
388 admin.addPeer("2", slaveClusterKey);
389 Thread.sleep(SLEEP_TIME);
390 rowKey = Bytes.toBytes("do rep");
391 put = new Put(rowKey);
392 put.add(famName, row, row);
393 LOG.info("Adding new row");
394 htable1.put(put);
395
396 get = new Get(rowKey);
397 for (int i = 0; i < NB_RETRIES; i++) {
398 if (i==NB_RETRIES-1) {
399 fail("Waited too much time for put replication");
400 }
401 Result res = htable2.get(get);
402 if (res.size() == 0) {
403 LOG.info("Row not available");
404 Thread.sleep(SLEEP_TIME*i);
405 } else {
406 assertArrayEquals(res.value(), row);
407 break;
408 }
409 }
410 }
411
412
413
414
415
416
417 @Test
418 public void loadTesting() throws Exception {
419 htable1.setWriteBufferSize(1024);
420 htable1.setAutoFlush(false);
421 for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
422 Put put = new Put(Bytes.toBytes(i));
423 put.add(famName, row, row);
424 htable1.put(put);
425 }
426 htable1.flushCommits();
427
428 Scan scan = new Scan();
429
430 ResultScanner scanner = htable1.getScanner(scan);
431 Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
432 scanner.close();
433
434 assertEquals(NB_ROWS_IN_BATCH *10, res.length);
435
436 scan = new Scan();
437
438 for (int i = 0; i < NB_RETRIES; i++) {
439
440 scanner = htable2.getScanner(scan);
441 res = scanner.next(NB_ROWS_IN_BIG_BATCH);
442 scanner.close();
443 if (res.length != NB_ROWS_IN_BIG_BATCH) {
444 if (i == NB_RETRIES-1) {
445 int lastRow = -1;
446 for (Result result : res) {
447 int currentRow = Bytes.toInt(result.getRow());
448 for (int row = lastRow+1; row < currentRow; row++) {
449 LOG.error("Row missing: " + row);
450 }
451 lastRow = currentRow;
452 }
453 LOG.error("Last row: " + lastRow);
454 fail("Waited too much time for normal batch replication, "
455 + res.length + " instead of " + NB_ROWS_IN_BIG_BATCH);
456 } else {
457 LOG.info("Only got " + res.length + " rows");
458 Thread.sleep(SLEEP_TIME);
459 }
460 } else {
461 break;
462 }
463 }
464 }
465
466
467
468
469
470
471
472 @Test
473 public void testVerifyRepJob() throws Exception {
474
475
476 testSmallBatch();
477
478 String[] args = new String[] {"2", Bytes.toString(tableName)};
479 Job job = VerifyReplication.createSubmittableJob(conf1, args);
480 if (job == null) {
481 fail("Job wasn't created, see the log");
482 }
483 if (!job.waitForCompletion(true)) {
484 fail("Job failed, see the log");
485 }
486 assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
487 findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
488 assertEquals(0, job.getCounters().
489 findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
490
491 Scan scan = new Scan();
492 ResultScanner rs = htable2.getScanner(scan);
493 Put put = null;
494 for (Result result : rs) {
495 put = new Put(result.getRow());
496 KeyValue firstVal = result.raw()[0];
497 put.add(firstVal.getFamily(),
498 firstVal.getQualifier(), Bytes.toBytes("diff data"));
499 htable2.put(put);
500 }
501 Delete delete = new Delete(put.getRow());
502 htable2.delete(delete);
503 job = VerifyReplication.createSubmittableJob(conf1, args);
504 if (job == null) {
505 fail("Job wasn't created, see the log");
506 }
507 if (!job.waitForCompletion(true)) {
508 fail("Job failed, see the log");
509 }
510 assertEquals(0, job.getCounters().
511 findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
512 assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
513 findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
514 }
515
516
517
518
519
520
521
522
523
524 @Test
525 public void queueFailover() throws Exception {
526 utility1.createMultiRegions(htable1, famName);
527
528
529
530 int rsToKill1 =
531 utility1.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
532 int rsToKill2 =
533 utility2.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
534
535
536 Thread killer1 = killARegionServer(utility1, 7500, rsToKill1);
537 Thread killer2 = killARegionServer(utility2, 10000, rsToKill2);
538
539 LOG.info("Start loading table");
540 int initialCount = utility1.loadTable(htable1, famName);
541 LOG.info("Done loading table");
542 killer1.join(5000);
543 killer2.join(5000);
544 LOG.info("Done waiting for threads");
545
546 Result[] res;
547 while (true) {
548 try {
549 Scan scan = new Scan();
550 ResultScanner scanner = htable1.getScanner(scan);
551 res = scanner.next(initialCount);
552 scanner.close();
553 break;
554 } catch (UnknownScannerException ex) {
555 LOG.info("Cluster wasn't ready yet, restarting scanner");
556 }
557 }
558
559
560 if (res.length != initialCount) {
561 LOG.warn("We lost some rows on the master cluster!");
562
563 initialCount = res.length;
564 }
565
566 Scan scan2 = new Scan();
567
568 int lastCount = 0;
569
570 for (int i = 0; i < NB_RETRIES; i++) {
571 if (i==NB_RETRIES-1) {
572 fail("Waited too much time for queueFailover replication");
573 }
574 ResultScanner scanner2 = htable2.getScanner(scan2);
575 Result[] res2 = scanner2.next(initialCount * 2);
576 scanner2.close();
577 if (res2.length < initialCount) {
578 if (lastCount < res2.length) {
579 i--;
580 }
581 lastCount = res2.length;
582 LOG.info("Only got " + lastCount + " rows instead of " +
583 initialCount + " current i=" + i);
584 Thread.sleep(SLEEP_TIME*2);
585 } else {
586 break;
587 }
588 }
589 }
590
591 private static Thread killARegionServer(final HBaseTestingUtility utility,
592 final long timeout, final int rs) {
593 Thread killer = new Thread() {
594 public void run() {
595 try {
596 Thread.sleep(timeout);
597 utility.expireRegionServerSession(rs);
598 } catch (Exception e) {
599 LOG.error(e);
600 }
601 }
602 };
603 killer.start();
604 return killer;
605 }
606 }