1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.replication;
21  
22  import static org.junit.Assert.assertArrayEquals;
23  import static org.junit.Assert.assertEquals;
24  import static org.junit.Assert.fail;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HBaseTestingUtility;
31  import org.apache.hadoop.hbase.HColumnDescriptor;
32  import org.apache.hadoop.hbase.HConstants;
33  import org.apache.hadoop.hbase.HTableDescriptor;
34  import org.apache.hadoop.hbase.KeyValue;
35  import org.apache.hadoop.hbase.UnknownScannerException;
36  import org.apache.hadoop.hbase.client.Delete;
37  import org.apache.hadoop.hbase.client.Get;
38  import org.apache.hadoop.hbase.client.HBaseAdmin;
39  import org.apache.hadoop.hbase.client.HTable;
40  import org.apache.hadoop.hbase.client.Put;
41  import org.apache.hadoop.hbase.client.Result;
42  import org.apache.hadoop.hbase.client.ResultScanner;
43  import org.apache.hadoop.hbase.client.Scan;
44  import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
45  import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
46  import org.apache.hadoop.hbase.util.Bytes;
47  import org.apache.hadoop.hbase.util.JVMClusterUtil;
48  import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
49  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
50  import org.apache.hadoop.mapreduce.Job;
51  import org.junit.AfterClass;
52  import org.junit.Before;
53  import org.junit.BeforeClass;
54  import org.junit.Test;
55  
56  public class TestReplication {
57  
58    private static final Log LOG = LogFactory.getLog(TestReplication.class);
59  
60    private static Configuration conf1;
61    private static Configuration conf2;
62  
63    private static ZooKeeperWatcher zkw1;
64    private static ZooKeeperWatcher zkw2;
65  
66    private static ReplicationAdmin admin;
67    private static String slaveClusterKey;
68  
69    private static HTable htable1;
70    private static HTable htable2;
71  
72    private static HBaseTestingUtility utility1;
73    private static HBaseTestingUtility utility2;
74    private static final int NB_ROWS_IN_BATCH = 100;
75    private static final int NB_ROWS_IN_BIG_BATCH =
76        NB_ROWS_IN_BATCH * 10;
77    private static final long SLEEP_TIME = 500;
78    private static final int NB_RETRIES = 10;
79  
80    private static final byte[] tableName = Bytes.toBytes("test");
81    private static final byte[] famName = Bytes.toBytes("f");
82    private static final byte[] row = Bytes.toBytes("row");
83    private static final byte[] noRepfamName = Bytes.toBytes("norep");
84  
85    /**
86     * @throws java.lang.Exception
87     */
88    @BeforeClass
89    public static void setUpBeforeClass() throws Exception {
90      conf1 = HBaseConfiguration.create();
91      conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
92      // smaller block size and capacity to trigger more operations
93      // and test them
94      conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
95      conf1.setInt("replication.source.size.capacity", 1024);
96      conf1.setLong("replication.source.sleepforretries", 100);
97      conf1.setInt("hbase.regionserver.maxlogs", 10);
98      conf1.setLong("hbase.master.logcleaner.ttl", 10);
99      conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
100     conf1.setBoolean("dfs.support.append", true);
101     conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
102 
103     utility1 = new HBaseTestingUtility(conf1);
104     utility1.startMiniZKCluster();
105     MiniZooKeeperCluster miniZK = utility1.getZkCluster();
106     zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null);
107     admin = new ReplicationAdmin(conf1);
108     LOG.info("Setup first Zk");
109 
110     conf2 = HBaseConfiguration.create();
111     conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
112     conf2.setInt("hbase.client.retries.number", 6);
113     conf2.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
114     conf2.setBoolean("dfs.support.append", true);
115 
116     utility2 = new HBaseTestingUtility(conf2);
117     utility2.setZkCluster(miniZK);
118     zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null);
119 
120     slaveClusterKey = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
121             conf2.get("hbase.zookeeper.property.clientPort")+":/2";
122     admin.addPeer("2", slaveClusterKey);
123     setIsReplication(true);
124 
125     LOG.info("Setup second Zk");
126 
127     utility1.startMiniCluster(2);
128     utility2.startMiniCluster(2);
129 
130     HTableDescriptor table = new HTableDescriptor(tableName);
131     HColumnDescriptor fam = new HColumnDescriptor(famName);
132     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
133     table.addFamily(fam);
134     fam = new HColumnDescriptor(noRepfamName);
135     table.addFamily(fam);
136     HBaseAdmin admin1 = new HBaseAdmin(conf1);
137     HBaseAdmin admin2 = new HBaseAdmin(conf2);
138     admin1.createTable(table);
139     admin2.createTable(table);
140 
141     htable1 = new HTable(conf1, tableName);
142     htable1.setWriteBufferSize(1024);
143     htable2 = new HTable(conf2, tableName);
144   }
145 
146   private static void setIsReplication(boolean rep) throws Exception {
147     LOG.info("Set rep " + rep);
148     admin.setReplicating(rep);
149     Thread.sleep(SLEEP_TIME);
150   }
151 
152   /**
153    * @throws java.lang.Exception
154    */
155   @Before
156   public void setUp() throws Exception {
157 
158     // Starting and stopping replication can make us miss new logs,
159     // rolling like this makes sure the most recent one gets added to the queue
160     for ( JVMClusterUtil.RegionServerThread r :
161         utility1.getHBaseCluster().getRegionServerThreads()) {
162       r.getRegionServer().getWAL().rollWriter();
163     }
164     utility1.truncateTable(tableName);
165     // truncating the table will send one Delete per row to the slave cluster
166     // in an async fashion, which is why we cannot just call truncateTable on
167     // utility2 since late writes could make it to the slave in some way.
168     // Instead, we truncate the first table and wait for all the Deletes to
169     // make it to the slave.
170     Scan scan = new Scan();
171     int lastCount = 0;
172     for (int i = 0; i < NB_RETRIES; i++) {
173       if (i==NB_RETRIES-1) {
174         fail("Waited too much time for truncate");
175       }
176       ResultScanner scanner = htable2.getScanner(scan);
177       Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
178       scanner.close();
179       if (res.length != 0) {
180        if (res.length < lastCount) {
181           i--; // Don't increment timeout if we make progress
182         }
183         lastCount = res.length;
184         LOG.info("Still got " + res.length + " rows");
185         Thread.sleep(SLEEP_TIME);
186       } else {
187         break;
188       }
189     }
190   }
191 
192   /**
193    * @throws java.lang.Exception
194    */
195   @AfterClass
196   public static void tearDownAfterClass() throws Exception {
197     utility2.shutdownMiniCluster();
198     utility1.shutdownMiniCluster();
199   }
200 
201   /**
202    * Add a row, check it's replicated, delete it, check's gone
203    * @throws Exception
204    */
205   @Test
206   public void testSimplePutDelete() throws Exception {
207     LOG.info("testSimplePutDelete");
208     Put put = new Put(row);
209     put.add(famName, row, row);
210 
211     htable1 = new HTable(conf1, tableName);
212     htable1.put(put);
213 
214     Get get = new Get(row);
215     for (int i = 0; i < NB_RETRIES; i++) {
216       if (i==NB_RETRIES-1) {
217         fail("Waited too much time for put replication");
218       }
219       Result res = htable2.get(get);
220       if (res.size() == 0) {
221         LOG.info("Row not available");
222         Thread.sleep(SLEEP_TIME);
223       } else {
224         assertArrayEquals(res.value(), row);
225         break;
226       }
227     }
228 
229     Delete del = new Delete(row);
230     htable1.delete(del);
231 
232     get = new Get(row);
233     for (int i = 0; i < NB_RETRIES; i++) {
234       if (i==NB_RETRIES-1) {
235         fail("Waited too much time for del replication");
236       }
237       Result res = htable2.get(get);
238       if (res.size() >= 1) {
239         LOG.info("Row not deleted");
240         Thread.sleep(SLEEP_TIME);
241       } else {
242         break;
243       }
244     }
245   }
246 
247   /**
248    * Try a small batch upload using the write buffer, check it's replicated
249    * @throws Exception
250    */
251   @Test
252   public void testSmallBatch() throws Exception {
253     LOG.info("testSmallBatch");
254     Put put;
255     // normal Batch tests
256     htable1.setAutoFlush(false);
257     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
258       put = new Put(Bytes.toBytes(i));
259       put.add(famName, row, row);
260       htable1.put(put);
261     }
262     htable1.flushCommits();
263 
264     Scan scan = new Scan();
265 
266     ResultScanner scanner1 = htable1.getScanner(scan);
267     Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH);
268     scanner1.close();
269     assertEquals(NB_ROWS_IN_BATCH, res1.length);
270 
271     for (int i = 0; i < NB_RETRIES; i++) {
272       if (i==NB_RETRIES-1) {
273         fail("Waited too much time for normal batch replication");
274       }
275       ResultScanner scanner = htable2.getScanner(scan);
276       Result[] res = scanner.next(NB_ROWS_IN_BATCH);
277       scanner.close();
278       if (res.length != NB_ROWS_IN_BATCH) {
279         LOG.info("Only got " + res.length + " rows");
280         Thread.sleep(SLEEP_TIME);
281       } else {
282         break;
283       }
284     }
285 
286     htable1.setAutoFlush(true);
287 
288   }
289 
290   /**
291    * Test stopping replication, trying to insert, make sure nothing's
292    * replicated, enable it, try replicating and it should work
293    * @throws Exception
294    */
295   @Test
296   public void testStartStop() throws Exception {
297 
298     // Test stopping replication
299     setIsReplication(false);
300 
301     Put put = new Put(Bytes.toBytes("stop start"));
302     put.add(famName, row, row);
303     htable1.put(put);
304 
305     Get get = new Get(Bytes.toBytes("stop start"));
306     for (int i = 0; i < NB_RETRIES; i++) {
307       if (i==NB_RETRIES-1) {
308         break;
309       }
310       Result res = htable2.get(get);
311       if(res.size() >= 1) {
312         fail("Replication wasn't stopped");
313 
314       } else {
315         LOG.info("Row not replicated, let's wait a bit more...");
316         Thread.sleep(SLEEP_TIME);
317       }
318     }
319 
320     // Test restart replication
321     setIsReplication(true);
322 
323     htable1.put(put);
324 
325     for (int i = 0; i < NB_RETRIES; i++) {
326       if (i==NB_RETRIES-1) {
327         fail("Waited too much time for put replication");
328       }
329       Result res = htable2.get(get);
330       if(res.size() == 0) {
331         LOG.info("Row not available");
332         Thread.sleep(SLEEP_TIME);
333       } else {
334         assertArrayEquals(res.value(), row);
335         break;
336       }
337     }
338 
339     put = new Put(Bytes.toBytes("do not rep"));
340     put.add(noRepfamName, row, row);
341     htable1.put(put);
342 
343     get = new Get(Bytes.toBytes("do not rep"));
344     for (int i = 0; i < NB_RETRIES; i++) {
345       if (i == NB_RETRIES-1) {
346         break;
347       }
348       Result res = htable2.get(get);
349       if (res.size() >= 1) {
350         fail("Not supposed to be replicated");
351       } else {
352         LOG.info("Row not replicated, let's wait a bit more...");
353         Thread.sleep(SLEEP_TIME);
354       }
355     }
356 
357   }
358 
359   /**
360    * Integration test for TestReplicationAdmin, removes and re-add a peer
361    * cluster
362    * @throws Exception
363    */
364   @Test
365   public void testAddAndRemoveClusters() throws Exception {
366     LOG.info("testAddAndRemoveClusters");
367     admin.removePeer("2");
368     Thread.sleep(SLEEP_TIME);
369     byte[] rowKey = Bytes.toBytes("Won't be replicated");
370     Put put = new Put(rowKey);
371     put.add(famName, row, row);
372     htable1.put(put);
373 
374     Get get = new Get(rowKey);
375     for (int i = 0; i < NB_RETRIES; i++) {
376       if (i == NB_RETRIES-1) {
377         break;
378       }
379       Result res = htable2.get(get);
380       if (res.size() >= 1) {
381         fail("Not supposed to be replicated");
382       } else {
383         LOG.info("Row not replicated, let's wait a bit more...");
384         Thread.sleep(SLEEP_TIME);
385       }
386     }
387 
388     admin.addPeer("2", slaveClusterKey);
389     Thread.sleep(SLEEP_TIME);
390     rowKey = Bytes.toBytes("do rep");
391     put = new Put(rowKey);
392     put.add(famName, row, row);
393     LOG.info("Adding new row");
394     htable1.put(put);
395 
396     get = new Get(rowKey);
397     for (int i = 0; i < NB_RETRIES; i++) {
398       if (i==NB_RETRIES-1) {
399         fail("Waited too much time for put replication");
400       }
401       Result res = htable2.get(get);
402       if (res.size() == 0) {
403         LOG.info("Row not available");
404         Thread.sleep(SLEEP_TIME*i);
405       } else {
406         assertArrayEquals(res.value(), row);
407         break;
408       }
409     }
410   }
411 
412   /**
413    * Do a more intense version testSmallBatch, one  that will trigger
414    * hlog rolling and other non-trivial code paths
415    * @throws Exception
416    */
417   @Test
418   public void loadTesting() throws Exception {
419     htable1.setWriteBufferSize(1024);
420     htable1.setAutoFlush(false);
421     for (int i = 0; i < NB_ROWS_IN_BIG_BATCH; i++) {
422       Put put = new Put(Bytes.toBytes(i));
423       put.add(famName, row, row);
424       htable1.put(put);
425     }
426     htable1.flushCommits();
427 
428     Scan scan = new Scan();
429 
430     ResultScanner scanner = htable1.getScanner(scan);
431     Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
432     scanner.close();
433 
434     assertEquals(NB_ROWS_IN_BATCH *10, res.length);
435 
436     scan = new Scan();
437 
438     for (int i = 0; i < NB_RETRIES; i++) {
439 
440       scanner = htable2.getScanner(scan);
441       res = scanner.next(NB_ROWS_IN_BIG_BATCH);
442       scanner.close();
443       if (res.length != NB_ROWS_IN_BIG_BATCH) {
444         if (i == NB_RETRIES-1) {
445           int lastRow = -1;
446           for (Result result : res) {
447             int currentRow = Bytes.toInt(result.getRow());
448             for (int row = lastRow+1; row < currentRow; row++) {
449               LOG.error("Row missing: " + row);
450             }
451             lastRow = currentRow;
452           }
453           LOG.error("Last row: " + lastRow);
454           fail("Waited too much time for normal batch replication, "
455               + res.length + " instead of " + NB_ROWS_IN_BIG_BATCH);
456         } else {
457           LOG.info("Only got " + res.length + " rows");
458           Thread.sleep(SLEEP_TIME);
459         }
460       } else {
461         break;
462       }
463     }
464   }
465 
466   /**
467    * Do a small loading into a table, make sure the data is really the same,
468    * then run the VerifyReplication job to check the results. Do a second
469    * comparison where all the cells are different.
470    * @throws Exception
471    */
472   @Test
473   public void testVerifyRepJob() throws Exception {
474     // Populate the tables, at the same time it guarantees that the tables are
475     // identical since it does the check
476     testSmallBatch();
477 
478     String[] args = new String[] {"2", Bytes.toString(tableName)};
479     Job job = VerifyReplication.createSubmittableJob(conf1, args);
480     if (job == null) {
481       fail("Job wasn't created, see the log");
482     }
483     if (!job.waitForCompletion(true)) {
484       fail("Job failed, see the log");
485     }
486     assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
487         findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
488     assertEquals(0, job.getCounters().
489         findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
490 
491     Scan scan = new Scan();
492     ResultScanner rs = htable2.getScanner(scan);
493     Put put = null;
494     for (Result result : rs) {
495       put = new Put(result.getRow());
496       KeyValue firstVal = result.raw()[0];
497       put.add(firstVal.getFamily(),
498           firstVal.getQualifier(), Bytes.toBytes("diff data"));
499       htable2.put(put);
500     }
501     Delete delete = new Delete(put.getRow());
502     htable2.delete(delete);
503     job = VerifyReplication.createSubmittableJob(conf1, args);
504     if (job == null) {
505       fail("Job wasn't created, see the log");
506     }
507     if (!job.waitForCompletion(true)) {
508       fail("Job failed, see the log");
509     }
510     assertEquals(0, job.getCounters().
511             findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
512         assertEquals(NB_ROWS_IN_BATCH, job.getCounters().
513             findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
514   }
515 
516   /**
517    * Load up multiple tables over 2 region servers and kill a source during
518    * the upload. The failover happens internally.
519    *
520    * WARNING this test sometimes fails because of HBASE-3515
521    *
522    * @throws Exception
523    */
524   @Test
525   public void queueFailover() throws Exception {
526     utility1.createMultiRegions(htable1, famName);
527 
528     // killing the RS with .META. can result into failed puts until we solve
529     // IO fencing
530     int rsToKill1 =
531         utility1.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
532     int rsToKill2 =
533         utility2.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
534 
535     // Takes about 20 secs to run the full loading, kill around the middle
536     Thread killer1 = killARegionServer(utility1, 7500, rsToKill1);
537     Thread killer2 = killARegionServer(utility2, 10000, rsToKill2);
538 
539     LOG.info("Start loading table");
540     int initialCount = utility1.loadTable(htable1, famName);
541     LOG.info("Done loading table");
542     killer1.join(5000);
543     killer2.join(5000);
544     LOG.info("Done waiting for threads");
545 
546     Result[] res;
547     while (true) {
548       try {
549         Scan scan = new Scan();
550         ResultScanner scanner = htable1.getScanner(scan);
551         res = scanner.next(initialCount);
552         scanner.close();
553         break;
554       } catch (UnknownScannerException ex) {
555         LOG.info("Cluster wasn't ready yet, restarting scanner");
556       }
557     }
558     // Test we actually have all the rows, we may miss some because we
559     // don't have IO fencing.
560     if (res.length != initialCount) {
561       LOG.warn("We lost some rows on the master cluster!");
562       // We don't really expect the other cluster to have more rows
563       initialCount = res.length;
564     }
565 
566     Scan scan2 = new Scan();
567 
568     int lastCount = 0;
569 
570     for (int i = 0; i < NB_RETRIES; i++) {
571       if (i==NB_RETRIES-1) {
572         fail("Waited too much time for queueFailover replication");
573       }
574       ResultScanner scanner2 = htable2.getScanner(scan2);
575       Result[] res2 = scanner2.next(initialCount * 2);
576       scanner2.close();
577       if (res2.length < initialCount) {
578         if (lastCount < res2.length) {
579           i--; // Don't increment timeout if we make progress
580         }
581         lastCount = res2.length;
582         LOG.info("Only got " + lastCount + " rows instead of " +
583             initialCount + " current i=" + i);
584         Thread.sleep(SLEEP_TIME*2);
585       } else {
586         break;
587       }
588     }
589   }
590 
591   private static Thread killARegionServer(final HBaseTestingUtility utility,
592                                    final long timeout, final int rs) {
593     Thread killer = new Thread() {
594       public void run() {
595         try {
596           Thread.sleep(timeout);
597           utility.expireRegionServerSession(rs);
598         } catch (Exception e) {
599           LOG.error(e);
600         }
601       }
602     };
603     killer.start();
604     return killer;
605   }
606 }