1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.fs.FileSystem;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.HBaseConfiguration;
28  import org.apache.hadoop.hbase.HBaseTestingUtility;
29  import org.apache.hadoop.hbase.HColumnDescriptor;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.HRegionInfo;
32  import org.apache.hadoop.hbase.HTableDescriptor;
33  import org.apache.hadoop.hbase.KeyValue;
34  import org.apache.hadoop.hbase.Server;
35  import org.apache.hadoop.hbase.catalog.CatalogTracker;
36  import org.apache.hadoop.hbase.regionserver.wal.HLog;
37  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
38  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
39  import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
40  import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
41  import org.apache.hadoop.hbase.util.Bytes;
42  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
43  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
44  import org.junit.After;
45  import org.junit.AfterClass;
46  import org.junit.Before;
47  import org.junit.BeforeClass;
48  import org.junit.Ignore;
49  import org.junit.Test;
50  
51  import java.net.URLEncoder;
52  import java.util.ArrayList;
53  import java.util.List;
54  import java.util.concurrent.atomic.AtomicBoolean;
55  
56  import static org.junit.Assert.assertEquals;
57  
58  public class TestReplicationSourceManager {
59  
60    private static final Log LOG =
61        LogFactory.getLog(TestReplicationSourceManager.class);
62  
63    private static Configuration conf;
64  
65    private static HBaseTestingUtility utility;
66  
67    private static Replication replication;
68  
69    private static ReplicationSourceManager manager;
70  
71    private static ZooKeeperWatcher zkw;
72  
73    private static HTableDescriptor htd;
74  
75    private static HRegionInfo hri;
76  
77    private static final byte[] r1 = Bytes.toBytes("r1");
78  
79    private static final byte[] r2 = Bytes.toBytes("r2");
80  
81    private static final byte[] f1 = Bytes.toBytes("f1");
82  
83    private static final byte[] f2 = Bytes.toBytes("f2");
84  
85    private static final byte[] test = Bytes.toBytes("test");
86  
87    private static FileSystem fs;
88  
89    private static Path oldLogDir;
90  
91    private static Path logDir;
92  
93  
94    @BeforeClass
95    public static void setUpBeforeClass() throws Exception {
96  
97      conf = HBaseConfiguration.create();
98      conf.set("replication.replicationsource.implementation",
99          ReplicationSourceDummy.class.getCanonicalName());
100     conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
101     utility = new HBaseTestingUtility(conf);
102     utility.startMiniZKCluster();
103 
104     zkw = new ZooKeeperWatcher(conf, "test", null);
105     ZKUtil.createWithParents(zkw, "/hbase/replication");
106     ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
107     ZKUtil.setData(zkw, "/hbase/replication/peers/1",Bytes.toBytes(
108           conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
109           conf.get("hbase.zookeeper.property.clientPort")+":/1"));
110     ZKUtil.createWithParents(zkw, "/hbase/replication/state");
111     ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true"));
112 
113     replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
114     manager = replication.getReplicationManager();
115     fs = FileSystem.get(conf);
116     oldLogDir = new Path(utility.getTestDir(),
117         HConstants.HREGION_OLDLOGDIR_NAME);
118     logDir = new Path(utility.getTestDir(),
119         HConstants.HREGION_LOGDIR_NAME);
120 
121     manager.addSource("1");
122 
123     htd = new HTableDescriptor(test);
124     HColumnDescriptor col = new HColumnDescriptor("f1");
125     col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
126     htd.addFamily(col);
127     col = new HColumnDescriptor("f2");
128     col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
129     htd.addFamily(col);
130 
131     hri = new HRegionInfo(htd, r1, r2);
132 
133 
134   }
135 
136   @AfterClass
137   public static void tearDownAfterClass() throws Exception {
138     manager.join();
139     utility.shutdownMiniCluster();
140   }
141 
142   @Before
143   public void setUp() throws Exception {
144     fs.delete(logDir, true);
145     fs.delete(oldLogDir, true);
146   }
147 
148   @After
149   public void tearDown() throws Exception {
150     setUp();
151   }
152 
153   @Test
154   public void testLogRoll() throws Exception {
155     long seq = 0;
156     long baseline = 1000;
157     long time = baseline;
158     KeyValue kv = new KeyValue(r1, f1, r1);
159     WALEdit edit = new WALEdit();
160     edit.add(kv);
161 
162     List<WALObserver> listeners = new ArrayList<WALObserver>();
163     listeners.add(replication);
164     HLog hlog = new HLog(fs, logDir, oldLogDir, conf, listeners,
165       URLEncoder.encode("regionserver:60020", "UTF8"));
166 
167     manager.init();
168 
169     // Testing normal log rolling every 20
170     for(long i = 1; i < 101; i++) {
171       if(i > 1 && i % 20 == 0) {
172         hlog.rollWriter();
173       }
174       LOG.info(i);
175       HLogKey key = new HLogKey(hri.getRegionName(),
176         test, seq++, System.currentTimeMillis());
177       hlog.append(hri, key, edit);
178     }
179 
180     // Simulate a rapid insert that's followed
181     // by a report that's still not totally complete (missing last one)
182     LOG.info(baseline + " and " + time);
183     baseline += 101;
184     time = baseline;
185     LOG.info(baseline + " and " + time);
186 
187     for (int i = 0; i < 3; i++) {
188       HLogKey key = new HLogKey(hri.getRegionName(),
189         test, seq++, System.currentTimeMillis());
190       hlog.append(hri, key, edit);
191     }
192 
193     assertEquals(6, manager.getHLogs().size());
194 
195     hlog.rollWriter();
196 
197     manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
198         "1", 0, false);
199 
200     HLogKey key = new HLogKey(hri.getRegionName(),
201           test, seq++, System.currentTimeMillis());
202     hlog.append(hri, key, edit);
203 
204     assertEquals(1, manager.getHLogs().size());
205 
206 
207     // TODO Need a case with only 2 HLogs and we only want to delete the first one
208   }
209 
210   static class DummyServer implements Server {
211 
212     @Override
213     public Configuration getConfiguration() {
214       return conf;
215     }
216 
217     @Override
218     public ZooKeeperWatcher getZooKeeper() {
219       return zkw;
220     }
221 
222     @Override
223     public CatalogTracker getCatalogTracker() {
224       return null;  //To change body of implemented methods use File | Settings | File Templates.
225     }
226 
227     @Override
228     public String getServerName() {
229       return null;  //To change body of implemented methods use File | Settings | File Templates.
230     }
231 
232     @Override
233     public void abort(String why, Throwable e) {
234       //To change body of implemented methods use File | Settings | File Templates.
235     }
236 
237     @Override
238     public void stop(String why) {
239       //To change body of implemented methods use File | Settings | File Templates.
240     }
241 
242     @Override
243     public boolean isStopped() {
244       return false;  //To change body of implemented methods use File | Settings | File Templates.
245     }
246   }
247 
248 }