1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.replication.regionserver;
21
22 import org.apache.commons.logging.Log;
23 import org.apache.commons.logging.LogFactory;
24 import org.apache.hadoop.conf.Configuration;
25 import org.apache.hadoop.fs.FileSystem;
26 import org.apache.hadoop.fs.Path;
27 import org.apache.hadoop.hbase.HBaseConfiguration;
28 import org.apache.hadoop.hbase.HBaseTestingUtility;
29 import org.apache.hadoop.hbase.HColumnDescriptor;
30 import org.apache.hadoop.hbase.HConstants;
31 import org.apache.hadoop.hbase.HRegionInfo;
32 import org.apache.hadoop.hbase.HTableDescriptor;
33 import org.apache.hadoop.hbase.KeyValue;
34 import org.apache.hadoop.hbase.Server;
35 import org.apache.hadoop.hbase.catalog.CatalogTracker;
36 import org.apache.hadoop.hbase.regionserver.wal.HLog;
37 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
38 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
39 import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
40 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
41 import org.apache.hadoop.hbase.util.Bytes;
42 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
43 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
44 import org.junit.After;
45 import org.junit.AfterClass;
46 import org.junit.Before;
47 import org.junit.BeforeClass;
48 import org.junit.Ignore;
49 import org.junit.Test;
50
51 import java.net.URLEncoder;
52 import java.util.ArrayList;
53 import java.util.List;
54 import java.util.concurrent.atomic.AtomicBoolean;
55
56 import static org.junit.Assert.assertEquals;
57
58 public class TestReplicationSourceManager {
59
60 private static final Log LOG =
61 LogFactory.getLog(TestReplicationSourceManager.class);
62
63 private static Configuration conf;
64
65 private static HBaseTestingUtility utility;
66
67 private static Replication replication;
68
69 private static ReplicationSourceManager manager;
70
71 private static ZooKeeperWatcher zkw;
72
73 private static HTableDescriptor htd;
74
75 private static HRegionInfo hri;
76
77 private static final byte[] r1 = Bytes.toBytes("r1");
78
79 private static final byte[] r2 = Bytes.toBytes("r2");
80
81 private static final byte[] f1 = Bytes.toBytes("f1");
82
83 private static final byte[] f2 = Bytes.toBytes("f2");
84
85 private static final byte[] test = Bytes.toBytes("test");
86
87 private static FileSystem fs;
88
89 private static Path oldLogDir;
90
91 private static Path logDir;
92
93
94 @BeforeClass
95 public static void setUpBeforeClass() throws Exception {
96
97 conf = HBaseConfiguration.create();
98 conf.set("replication.replicationsource.implementation",
99 ReplicationSourceDummy.class.getCanonicalName());
100 conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
101 utility = new HBaseTestingUtility(conf);
102 utility.startMiniZKCluster();
103
104 zkw = new ZooKeeperWatcher(conf, "test", null);
105 ZKUtil.createWithParents(zkw, "/hbase/replication");
106 ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
107 ZKUtil.setData(zkw, "/hbase/replication/peers/1",Bytes.toBytes(
108 conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
109 conf.get("hbase.zookeeper.property.clientPort")+":/1"));
110 ZKUtil.createWithParents(zkw, "/hbase/replication/state");
111 ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true"));
112
113 replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
114 manager = replication.getReplicationManager();
115 fs = FileSystem.get(conf);
116 oldLogDir = new Path(utility.getTestDir(),
117 HConstants.HREGION_OLDLOGDIR_NAME);
118 logDir = new Path(utility.getTestDir(),
119 HConstants.HREGION_LOGDIR_NAME);
120
121 manager.addSource("1");
122
123 htd = new HTableDescriptor(test);
124 HColumnDescriptor col = new HColumnDescriptor("f1");
125 col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
126 htd.addFamily(col);
127 col = new HColumnDescriptor("f2");
128 col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
129 htd.addFamily(col);
130
131 hri = new HRegionInfo(htd, r1, r2);
132
133
134 }
135
136 @AfterClass
137 public static void tearDownAfterClass() throws Exception {
138 manager.join();
139 utility.shutdownMiniCluster();
140 }
141
142 @Before
143 public void setUp() throws Exception {
144 fs.delete(logDir, true);
145 fs.delete(oldLogDir, true);
146 }
147
148 @After
149 public void tearDown() throws Exception {
150 setUp();
151 }
152
153 @Test
154 public void testLogRoll() throws Exception {
155 long seq = 0;
156 long baseline = 1000;
157 long time = baseline;
158 KeyValue kv = new KeyValue(r1, f1, r1);
159 WALEdit edit = new WALEdit();
160 edit.add(kv);
161
162 List<WALObserver> listeners = new ArrayList<WALObserver>();
163 listeners.add(replication);
164 HLog hlog = new HLog(fs, logDir, oldLogDir, conf, listeners,
165 URLEncoder.encode("regionserver:60020", "UTF8"));
166
167 manager.init();
168
169
170 for(long i = 1; i < 101; i++) {
171 if(i > 1 && i % 20 == 0) {
172 hlog.rollWriter();
173 }
174 LOG.info(i);
175 HLogKey key = new HLogKey(hri.getRegionName(),
176 test, seq++, System.currentTimeMillis());
177 hlog.append(hri, key, edit);
178 }
179
180
181
182 LOG.info(baseline + " and " + time);
183 baseline += 101;
184 time = baseline;
185 LOG.info(baseline + " and " + time);
186
187 for (int i = 0; i < 3; i++) {
188 HLogKey key = new HLogKey(hri.getRegionName(),
189 test, seq++, System.currentTimeMillis());
190 hlog.append(hri, key, edit);
191 }
192
193 assertEquals(6, manager.getHLogs().size());
194
195 hlog.rollWriter();
196
197 manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
198 "1", 0, false);
199
200 HLogKey key = new HLogKey(hri.getRegionName(),
201 test, seq++, System.currentTimeMillis());
202 hlog.append(hri, key, edit);
203
204 assertEquals(1, manager.getHLogs().size());
205
206
207
208 }
209
210 static class DummyServer implements Server {
211
212 @Override
213 public Configuration getConfiguration() {
214 return conf;
215 }
216
217 @Override
218 public ZooKeeperWatcher getZooKeeper() {
219 return zkw;
220 }
221
222 @Override
223 public CatalogTracker getCatalogTracker() {
224 return null;
225 }
226
227 @Override
228 public String getServerName() {
229 return null;
230 }
231
232 @Override
233 public void abort(String why, Throwable e) {
234
235 }
236
237 @Override
238 public void stop(String why) {
239
240 }
241
242 @Override
243 public boolean isStopped() {
244 return false;
245 }
246 }
247
248 }