1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.master;
21
22 import static org.junit.Assert.assertFalse;
23 import static org.junit.Assert.assertNotNull;
24 import static org.junit.Assert.assertTrue;
25
26 import java.io.IOException;
27 import java.util.concurrent.Semaphore;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.HBaseTestingUtility;
33 import org.apache.hadoop.hbase.HServerAddress;
34 import org.apache.hadoop.hbase.Server;
35 import org.apache.hadoop.hbase.catalog.CatalogTracker;
36 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
37 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
38 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
39 import org.apache.zookeeper.KeeperException;
40 import org.junit.AfterClass;
41 import org.junit.BeforeClass;
42 import org.junit.Test;
43
44
45
46
47 public class TestActiveMasterManager {
48 private final static Log LOG = LogFactory.getLog(TestActiveMasterManager.class);
49 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
50
51 @BeforeClass
52 public static void setUpBeforeClass() throws Exception {
53 TEST_UTIL.startMiniZKCluster();
54 }
55
56 @AfterClass
57 public static void tearDownAfterClass() throws Exception {
58 TEST_UTIL.shutdownMiniZKCluster();
59 }
60
61 @Test public void testRestartMaster() throws IOException, KeeperException {
62 ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
63 "testActiveMasterManagerFromZK", null);
64 ZKUtil.createAndFailSilent(zk, zk.baseZNode);
65 try {
66 ZKUtil.deleteNode(zk, zk.masterAddressZNode);
67 } catch(KeeperException.NoNodeException nne) {}
68
69
70 HServerAddress master = new HServerAddress("localhost", 1);
71
72 DummyMaster dummyMaster = new DummyMaster();
73 ActiveMasterManager activeMasterManager = new ActiveMasterManager(zk,
74 master, dummyMaster);
75 zk.registerListener(activeMasterManager);
76 assertFalse(activeMasterManager.clusterHasActiveMaster.get());
77
78
79 activeMasterManager.blockUntilBecomingActiveMaster();
80 assertTrue(activeMasterManager.clusterHasActiveMaster.get());
81 assertMaster(zk, master);
82
83
84 DummyMaster secondDummyMaster = new DummyMaster();
85 ActiveMasterManager secondActiveMasterManager = new ActiveMasterManager(zk,
86 master, secondDummyMaster);
87 zk.registerListener(secondActiveMasterManager);
88 assertFalse(secondActiveMasterManager.clusterHasActiveMaster.get());
89 activeMasterManager.blockUntilBecomingActiveMaster();
90 assertTrue(activeMasterManager.clusterHasActiveMaster.get());
91 assertMaster(zk, master);
92 }
93
94
95
96
97
98
99 @Test
100 public void testActiveMasterManagerFromZK() throws Exception {
101 ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
102 "testActiveMasterManagerFromZK", null);
103 ZKUtil.createAndFailSilent(zk, zk.baseZNode);
104 try {
105 ZKUtil.deleteNode(zk, zk.masterAddressZNode);
106 } catch(KeeperException.NoNodeException nne) {}
107
108
109 HServerAddress firstMasterAddress = new HServerAddress("localhost", 1);
110 HServerAddress secondMasterAddress = new HServerAddress("localhost", 2);
111
112
113 DummyMaster ms1 = new DummyMaster();
114 ActiveMasterManager activeMasterManager = new ActiveMasterManager(zk,
115 firstMasterAddress, ms1);
116 zk.registerListener(activeMasterManager);
117 assertFalse(activeMasterManager.clusterHasActiveMaster.get());
118
119
120 activeMasterManager.blockUntilBecomingActiveMaster();
121 assertTrue(activeMasterManager.clusterHasActiveMaster.get());
122 assertMaster(zk, firstMasterAddress);
123
124
125 WaitToBeMasterThread t = new WaitToBeMasterThread(zk, secondMasterAddress);
126 zk.registerListener(t.manager);
127 t.start();
128
129
130 int sleeps = 0;
131 while(!t.manager.clusterHasActiveMaster.get() && sleeps < 100) {
132 Thread.sleep(10);
133 sleeps++;
134 }
135
136
137 assertTrue(activeMasterManager.clusterHasActiveMaster.get());
138 assertTrue(t.manager.clusterHasActiveMaster.get());
139
140 assertFalse(t.isActiveMaster);
141
142
143 ms1.stop("stopping first server");
144
145
146 NodeDeletionListener listener = new NodeDeletionListener(zk, zk.masterAddressZNode);
147 zk.registerListener(listener);
148
149 LOG.info("Deleting master node");
150 ZKUtil.deleteNode(zk, zk.masterAddressZNode);
151
152
153 LOG.info("Waiting for active master manager to be notified");
154 listener.waitForDeletion();
155 LOG.info("Master node deleted");
156
157
158
159 sleeps = 0;
160 while(!t.isActiveMaster && sleeps < 100) {
161 Thread.sleep(10);
162 sleeps++;
163 }
164 LOG.debug("Slept " + sleeps + " times");
165
166 assertTrue(t.manager.clusterHasActiveMaster.get());
167 assertTrue(t.isActiveMaster);
168
169 LOG.info("Deleting master node");
170 ZKUtil.deleteNode(zk, zk.masterAddressZNode);
171 }
172
173
174
175
176
177
178
179 private void assertMaster(ZooKeeperWatcher zk,
180 HServerAddress expectedAddress) throws KeeperException {
181 HServerAddress readAddress = ZKUtil.getDataAsAddress(zk, zk.masterAddressZNode);
182 assertNotNull(readAddress);
183 assertTrue(expectedAddress.equals(readAddress));
184 }
185
186 public static class WaitToBeMasterThread extends Thread {
187
188 ActiveMasterManager manager;
189 boolean isActiveMaster;
190
191 public WaitToBeMasterThread(ZooKeeperWatcher zk,
192 HServerAddress address) {
193 this.manager = new ActiveMasterManager(zk, address,
194 new DummyMaster());
195 isActiveMaster = false;
196 }
197
198 @Override
199 public void run() {
200 manager.blockUntilBecomingActiveMaster();
201 LOG.info("Second master has become the active master!");
202 isActiveMaster = true;
203 }
204 }
205
206 public static class NodeDeletionListener extends ZooKeeperListener {
207 private static final Log LOG = LogFactory.getLog(NodeDeletionListener.class);
208
209 private Semaphore lock;
210 private String node;
211
212 public NodeDeletionListener(ZooKeeperWatcher watcher, String node) {
213 super(watcher);
214 lock = new Semaphore(0);
215 this.node = node;
216 }
217
218 @Override
219 public void nodeDeleted(String path) {
220 if(path.equals(node)) {
221 LOG.debug("nodeDeleted(" + path + ")");
222 lock.release();
223 }
224 }
225
226 public void waitForDeletion() throws InterruptedException {
227 lock.acquire();
228 }
229 }
230
231
232
233
234 public static class DummyMaster implements Server {
235 private volatile boolean stopped;
236
237 @Override
238 public void abort(final String msg, final Throwable t) {}
239
240 @Override
241 public Configuration getConfiguration() {
242 return null;
243 }
244
245 @Override
246 public ZooKeeperWatcher getZooKeeper() {
247 return null;
248 }
249
250 @Override
251 public String getServerName() {
252 return null;
253 }
254
255 @Override
256 public boolean isStopped() {
257 return this.stopped;
258 }
259
260 @Override
261 public void stop(String why) {
262 this.stopped = true;
263 }
264
265 @Override
266 public CatalogTracker getCatalogTracker() {
267 return null;
268 }
269 }
270 }