1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.master;
21
22 import java.util.concurrent.atomic.AtomicBoolean;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.hbase.HServerAddress;
27 import org.apache.hadoop.hbase.Server;
28 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
29 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
30 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
31 import org.apache.zookeeper.KeeperException;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 class ActiveMasterManager extends ZooKeeperListener {
47 private static final Log LOG = LogFactory.getLog(ActiveMasterManager.class);
48
49 final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false);
50
51 private final HServerAddress address;
52 private final Server master;
53
54 ActiveMasterManager(ZooKeeperWatcher watcher, HServerAddress address,
55 Server master) {
56 super(watcher);
57 this.address = address;
58 this.master = master;
59 }
60
61 @Override
62 public void nodeCreated(String path) {
63 if(path.equals(watcher.masterAddressZNode) && !master.isStopped()) {
64 handleMasterNodeChange();
65 }
66 }
67
68 @Override
69 public void nodeDeleted(String path) {
70 if(path.equals(watcher.masterAddressZNode) && !master.isStopped()) {
71 handleMasterNodeChange();
72 }
73 }
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 private void handleMasterNodeChange() {
89
90 try {
91 synchronized(clusterHasActiveMaster) {
92 if(ZKUtil.watchAndCheckExists(watcher, watcher.masterAddressZNode)) {
93
94 LOG.debug("A master is now available");
95 clusterHasActiveMaster.set(true);
96 } else {
97
98 LOG.debug("No master available. Notifying waiting threads");
99 clusterHasActiveMaster.set(false);
100
101 clusterHasActiveMaster.notifyAll();
102 }
103 }
104 } catch (KeeperException ke) {
105 master.abort("Received an unexpected KeeperException, aborting", ke);
106 }
107 }
108
109
110
111
112
113
114
115
116
117
118
119
120
121 boolean blockUntilBecomingActiveMaster() {
122 boolean cleanSetOfActiveMaster = true;
123
124 try {
125 if (ZKUtil.setAddressAndWatch(this.watcher,
126 this.watcher.masterAddressZNode, this.address)) {
127
128 this.clusterHasActiveMaster.set(true);
129 LOG.info("Master=" + this.address);
130 return cleanSetOfActiveMaster;
131 }
132 cleanSetOfActiveMaster = false;
133
134
135
136 this.clusterHasActiveMaster.set(true);
137 HServerAddress currentMaster =
138 ZKUtil.getDataAsAddress(this.watcher, this.watcher.masterAddressZNode);
139 if (currentMaster != null && currentMaster.equals(this.address)) {
140 LOG.info("Current master has this master's address, " + currentMaster +
141 "; master was restarted? Waiting on znode to expire...");
142
143 ZKUtil.deleteNode(this.watcher, this.watcher.masterAddressZNode);
144 } else {
145 LOG.info("Another master is the active master, " + currentMaster +
146 "; waiting to become the next active master");
147 }
148 } catch (KeeperException ke) {
149 master.abort("Received an unexpected KeeperException, aborting", ke);
150 return false;
151 }
152 synchronized (this.clusterHasActiveMaster) {
153 while (this.clusterHasActiveMaster.get() && !this.master.isStopped()) {
154 try {
155 this.clusterHasActiveMaster.wait();
156 } catch (InterruptedException e) {
157
158 LOG.debug("Interrupted waiting for master to die", e);
159 }
160 }
161 if (this.master.isStopped()) {
162 return cleanSetOfActiveMaster;
163 }
164
165 blockUntilBecomingActiveMaster();
166 }
167 return cleanSetOfActiveMaster;
168 }
169
170
171
172
173 public boolean isActiveMaster() {
174 try {
175 if (ZKUtil.checkExists(watcher, watcher.masterAddressZNode) >= 0) {
176 return true;
177 }
178 }
179 catch (KeeperException ke) {
180 LOG.info("Received an unexpected KeeperException when checking " +
181 "isActiveMaster : "+ ke);
182 }
183 return false;
184 }
185
186 public void stop() {
187 try {
188
189 HServerAddress zkAddress =
190 ZKUtil.getDataAsAddress(watcher, watcher.masterAddressZNode);
191
192 if(zkAddress != null &&
193 zkAddress.equals(address)) {
194 ZKUtil.deleteNode(watcher, watcher.masterAddressZNode);
195 }
196 } catch (KeeperException e) {
197 LOG.error(this.watcher.prefix("Error deleting our own master address node"), e);
198 }
199 }
200 }