1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import static org.junit.Assert.assertEquals;
23  import static org.junit.Assert.assertTrue;
24  
25  import java.util.concurrent.atomic.AtomicBoolean;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.hbase.HBaseTestingUtility;
31  import org.apache.hadoop.hbase.HConstants;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.Stoppable;
34  import org.apache.hadoop.hbase.client.Get;
35  import org.apache.hadoop.hbase.client.HTable;
36  import org.apache.hadoop.hbase.client.Result;
37  import org.apache.hadoop.hbase.client.ResultScanner;
38  import org.apache.hadoop.hbase.client.Scan;
39  import org.apache.hadoop.hbase.regionserver.wal.HLog;
40  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
41  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
42  import org.apache.hadoop.hbase.util.Bytes;
43  import org.junit.AfterClass;
44  import org.junit.Before;
45  import org.junit.BeforeClass;
46  import org.junit.Test;
47  
48  public class TestReplicationSink {
49    private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
50    private static final int BATCH_SIZE = 10;
51    private static final long SLEEP_TIME = 500;
52  
53    private final static HBaseTestingUtility TEST_UTIL =
54        new HBaseTestingUtility();
55  
56    private static ReplicationSink SINK;
57  
58    private static final byte[] TABLE_NAME1 =
59        Bytes.toBytes("table1");
60    private static final byte[] TABLE_NAME2 =
61        Bytes.toBytes("table2");
62  
63    private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
64    private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
65  
66    private static HTable table1;
67    private static Stoppable STOPPABLE = new Stoppable() {
68      final AtomicBoolean stop = new AtomicBoolean(false);
69  
70      @Override
71      public boolean isStopped() {
72        return this.stop.get();
73      }
74  
75      @Override
76      public void stop(String why) {
77        LOG.info("STOPPING BECAUSE: " + why);
78        this.stop.set(true);
79      }
80      
81    };
82  
83    private static HTable table2;
84  
85     /**
86     * @throws java.lang.Exception
87     */
88    @BeforeClass
89    public static void setUpBeforeClass() throws Exception {
90      TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
91      TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
92      TEST_UTIL.startMiniCluster(3);
93      SINK =
94        new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
95      table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
96      table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
97    }
98  
99    /**
100    * @throws java.lang.Exception
101    */
102   @AfterClass
103   public static void tearDownAfterClass() throws Exception {
104     STOPPABLE.stop("Shutting down");
105     TEST_UTIL.shutdownMiniCluster();
106   }
107 
108   /**
109    * @throws java.lang.Exception
110    */
111   @Before
112   public void setUp() throws Exception {
113     table1 = TEST_UTIL.truncateTable(TABLE_NAME1);
114     table2 = TEST_UTIL.truncateTable(TABLE_NAME2);
115     Thread.sleep(SLEEP_TIME);
116   }
117 
118   /**
119    * Insert a whole batch of entries
120    * @throws Exception
121    */
122   @Test
123   public void testBatchSink() throws Exception {
124     HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
125     for(int i = 0; i < BATCH_SIZE; i++) {
126       entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
127     }
128     SINK.replicateEntries(entries);
129     Scan scan = new Scan();
130     ResultScanner scanRes = table1.getScanner(scan);
131     assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
132   }
133 
134   /**
135    * Insert a mix of puts and deletes
136    * @throws Exception
137    */
138   @Test
139   public void testMixedPutDelete() throws Exception {
140     HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE/2];
141     for(int i = 0; i < BATCH_SIZE/2; i++) {
142       entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
143     }
144     SINK.replicateEntries(entries);
145 
146     entries = new HLog.Entry[BATCH_SIZE];
147     for(int i = 0; i < BATCH_SIZE; i++) {
148       entries[i] = createEntry(TABLE_NAME1, i,
149           i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn);
150     }
151 
152     SINK.replicateEntries(entries);
153     Scan scan = new Scan();
154     ResultScanner scanRes = table1.getScanner(scan);
155     assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
156   }
157 
158   /**
159    * Insert to 2 different tables
160    * @throws Exception
161    */
162   @Test
163   public void testMixedPutTables() throws Exception {
164     HLog.Entry[] entries = new HLog.Entry[BATCH_SIZE];
165     for(int i = 0; i < BATCH_SIZE; i++) {
166       entries[i] =
167           createEntry( i % 2 == 0 ? TABLE_NAME2 : TABLE_NAME1,
168               i, KeyValue.Type.Put);
169     }
170 
171     SINK.replicateEntries(entries);
172     Scan scan = new Scan();
173     ResultScanner scanRes = table2.getScanner(scan);
174     for(Result res : scanRes) {
175       assertTrue(Bytes.toInt(res.getRow()) % 2 == 0);
176     }
177   }
178 
179   /**
180    * Insert then do different types of deletes
181    * @throws Exception
182    */
183   @Test
184   public void testMixedDeletes() throws Exception {
185     HLog.Entry[] entries = new HLog.Entry[3];
186     for(int i = 0; i < 3; i++) {
187       entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
188     }
189     SINK.replicateEntries(entries);
190     entries = new HLog.Entry[3];
191 
192     entries[0] = createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn);
193     entries[1] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
194     entries[2] = createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn);
195 
196     SINK.replicateEntries(entries);
197 
198     Scan scan = new Scan();
199     ResultScanner scanRes = table1.getScanner(scan);
200     assertEquals(0, scanRes.next(3).length);
201   }
202 
203   /**
204    * Puts are buffered, but this tests when a delete (not-buffered) is applied
205    * before the actual Put that creates it.
206    * @throws Exception
207    */
208   @Test
209   public void testApplyDeleteBeforePut() throws Exception {
210     HLog.Entry[] entries = new HLog.Entry[5];
211     for(int i = 0; i < 2; i++) {
212       entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
213     }
214     entries[2] = createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily);
215     for(int i = 3; i < 5; i++) {
216       entries[i] = createEntry(TABLE_NAME1, i, KeyValue.Type.Put);
217     }
218     SINK.replicateEntries(entries);
219     Get get = new Get(Bytes.toBytes(1));
220     Result res = table1.get(get);
221     assertEquals(0, res.size());
222   }
223 
224   private HLog.Entry createEntry(byte [] table, int row,  KeyValue.Type type) {
225     byte[] fam = Bytes.equals(table, TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
226     byte[] rowBytes = Bytes.toBytes(row);
227     // Just make sure we don't get the same ts for two consecutive rows with
228     // same key
229     try {
230       Thread.sleep(1);
231     } catch (InterruptedException e) {
232       LOG.info("Was interrupted while sleep, meh", e);
233     }
234     final long now = System.currentTimeMillis();
235     KeyValue kv = null;
236     if(type.getCode() == KeyValue.Type.Put.getCode()) {
237       kv = new KeyValue(rowBytes, fam, fam, now,
238           KeyValue.Type.Put, Bytes.toBytes(row));
239     } else if (type.getCode() == KeyValue.Type.DeleteColumn.getCode()) {
240         kv = new KeyValue(rowBytes, fam, fam,
241             now, KeyValue.Type.DeleteColumn);
242     } else if (type.getCode() == KeyValue.Type.DeleteFamily.getCode()) {
243         kv = new KeyValue(rowBytes, fam, null,
244             now, KeyValue.Type.DeleteFamily);
245     }
246 
247     HLogKey key = new HLogKey(table, table, now, now);
248 
249     WALEdit edit = new WALEdit();
250     edit.add(kv);
251 
252     return new HLog.Entry(key, edit);
253   }
254 }