1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.master;
21
22 import java.io.IOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.Set;
29 import java.util.concurrent.ConcurrentHashMap;
30
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.ClockOutOfSyncException;
35 import org.apache.hadoop.hbase.HMsg;
36 import org.apache.hadoop.hbase.HRegionInfo;
37 import org.apache.hadoop.hbase.HServerAddress;
38 import org.apache.hadoop.hbase.HServerInfo;
39 import org.apache.hadoop.hbase.HServerLoad;
40 import org.apache.hadoop.hbase.PleaseHoldException;
41 import org.apache.hadoop.hbase.Server;
42 import org.apache.hadoop.hbase.YouAreDeadException;
43 import org.apache.hadoop.hbase.catalog.CatalogTracker;
44 import org.apache.hadoop.hbase.client.HConnection;
45 import org.apache.hadoop.hbase.client.HConnectionManager;
46 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
47 import org.apache.hadoop.hbase.ipc.HRegionInterface;
48 import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
49 import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
50 import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
51 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 public class ServerManager {
68 private static final Log LOG = LogFactory.getLog(ServerManager.class);
69
70
71 private volatile boolean clusterShutdown = false;
72
73
74 private final Map<String, HServerInfo> onlineServers =
75 new ConcurrentHashMap<String, HServerInfo>();
76
77
78
79
80
81 private final Map<String, HRegionInterface> serverConnections =
82 new HashMap<String, HRegionInterface>();
83
84 private final Server master;
85 private final MasterServices services;
86
87
88 private final MasterMetrics metrics;
89
90 private final DeadServer deadservers;
91
92 private final long maxSkew;
93
94
95
96
97
98
99
100 public ServerManager(final Server master, final MasterServices services,
101 MasterMetrics metrics) {
102 this.master = master;
103 this.services = services;
104 this.metrics = metrics;
105 Configuration c = master.getConfiguration();
106 maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
107 this.deadservers =
108 new DeadServer(c.getInt("hbase.master.maxdeadservers", 100));
109 }
110
111
112
113
114
115
116
117 void regionServerStartup(final HServerInfo serverInfo, long serverCurrentTime)
118 throws IOException {
119
120
121
122
123
124
125
126 HServerInfo info = new HServerInfo(serverInfo);
127 checkIsDead(info.getServerName(), "STARTUP");
128 checkAlreadySameHostPort(info);
129 checkClockSkew(info, serverCurrentTime);
130 recordNewServer(info, false, null);
131 }
132
133
134
135
136
137
138 void checkAlreadySameHostPort(final HServerInfo serverInfo)
139 throws PleaseHoldException {
140 String hostAndPort = serverInfo.getServerAddress().toString();
141 HServerInfo existingServer =
142 haveServerWithSameHostAndPortAlready(serverInfo.getHostnamePort());
143 if (existingServer != null) {
144 String message = "Server start rejected; we already have " + hostAndPort +
145 " registered; existingServer=" + existingServer + ", newServer=" + serverInfo;
146 LOG.info(message);
147 if (existingServer.getStartCode() < serverInfo.getStartCode()) {
148 LOG.info("Triggering server recovery; existingServer " +
149 existingServer.getServerName() + " looks stale");
150 expireServer(existingServer);
151 }
152 throw new PleaseHoldException(message);
153 }
154 }
155
156 private HServerInfo haveServerWithSameHostAndPortAlready(final String hostnamePort) {
157 synchronized (this.onlineServers) {
158 for (Map.Entry<String, HServerInfo> e: this.onlineServers.entrySet()) {
159 if (e.getValue().getHostnamePort().equals(hostnamePort)) {
160 return e.getValue();
161 }
162 }
163 }
164 return null;
165 }
166
167
168
169
170
171
172 private void checkClockSkew(final HServerInfo serverInfo,
173 final long serverCurrentTime)
174 throws ClockOutOfSyncException {
175 long skew = System.currentTimeMillis() - serverCurrentTime;
176 if (skew > maxSkew) {
177 String message = "Server " + serverInfo.getServerName() + " has been " +
178 "rejected; Reported time is too far out of sync with master. " +
179 "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
180 LOG.warn(message);
181 throw new ClockOutOfSyncException(message);
182 }
183 }
184
185
186
187
188
189
190
191
192
193 private void checkIsDead(final String serverName, final String what)
194 throws YouAreDeadException {
195 if (this.deadservers.isDeadServer(serverName)) {
196
197
198 String message = "Server " + what + " rejected; currently processing " +
199 serverName + " as dead server";
200 LOG.debug(message);
201 throw new YouAreDeadException(message);
202 }
203
204 if (this.deadservers.cleanPreviousInstance(serverName)) {
205
206
207 LOG.debug("Server " + serverName + " came back up, removed it from the" +
208 " dead servers list");
209 }
210 }
211
212
213
214
215
216
217
218
219 void recordNewServer(HServerInfo info, boolean useInfoLoad,
220 HRegionInterface hri) {
221 HServerLoad load = useInfoLoad? info.getLoad(): new HServerLoad();
222 String serverName = info.getServerName();
223 LOG.info("Registering server=" + serverName + ", regionCount=" +
224 load.getLoad() + ", userLoad=" + useInfoLoad);
225 info.setLoad(load);
226
227
228
229
230
231 this.onlineServers.put(serverName, info);
232 if (hri == null) {
233 serverConnections.remove(serverName);
234 } else {
235 serverConnections.put(serverName, hri);
236 }
237 }
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252 HMsg [] regionServerReport(final HServerInfo serverInfo,
253 final HMsg [] msgs, final HRegionInfo[] mostLoadedRegions)
254 throws IOException {
255
256 HServerInfo info = new HServerInfo(serverInfo);
257
258
259 checkIsDead(info.getServerName(), "REPORT");
260
261
262 HServerInfo storedInfo = this.onlineServers.get(info.getServerName());
263 if (storedInfo == null) {
264
265
266 checkAlreadySameHostPort(info);
267
268
269
270
271
272 recordNewServer(info, true, null);
273
274
275
276
277
278 if (msgs.length > 0)
279 throw new PleaseHoldException("FIX! Putting off " +
280 "message processing because not yet ready but possible we won't be " +
281 "ready next on next report");
282
283 storedInfo = this.onlineServers.get(info.getServerName());
284 }
285
286
287 if (raceThatShouldNotHappenAnymore(storedInfo, info)) {
288 return HMsg.STOP_REGIONSERVER_ARRAY;
289 }
290
291 for (HMsg msg: msgs) {
292 LOG.info("Received " + msg + " from " + serverInfo.getServerName());
293 switch (msg.getType()) {
294 case REGION_SPLIT:
295 this.services.getAssignmentManager().handleSplitReport(serverInfo,
296 msg.getRegionInfo(), msg.getDaughterA(), msg.getDaughterB());
297 break;
298
299 default:
300 LOG.error("Unhandled msg type " + msg);
301 }
302 }
303
304 HMsg [] reply = null;
305 int numservers = countOfRegionServers();
306 if (this.clusterShutdown) {
307 if (numservers <= 2) {
308
309
310
311
312
313 reply = HMsg.STOP_REGIONSERVER_ARRAY;
314 }
315 }
316 return processRegionServerAllsWell(info, mostLoadedRegions, reply);
317 }
318
319 private boolean raceThatShouldNotHappenAnymore(final HServerInfo storedInfo,
320 final HServerInfo reportedInfo) {
321 if (storedInfo.getStartCode() != reportedInfo.getStartCode()) {
322
323
324
325
326
327
328
329 LOG.warn("Race condition detected: " + reportedInfo.getServerName());
330 synchronized (this.onlineServers) {
331 removeServerInfo(reportedInfo.getServerName());
332 notifyOnlineServers();
333 }
334 return true;
335 }
336 return false;
337 }
338
339
340
341
342
343
344
345
346
347 private HMsg[] processRegionServerAllsWell(HServerInfo serverInfo,
348 final HRegionInfo[] mostLoadedRegions, HMsg[] msgs)
349 throws IOException {
350
351 this.onlineServers.put(serverInfo.getServerName(), serverInfo);
352 HServerLoad load = serverInfo.getLoad();
353 if (load != null && this.metrics != null) {
354 this.metrics.incrementRequests(load.getNumberOfRequests());
355 }
356
357 return msgs;
358 }
359
360
361
362
363
364 private boolean removeServerInfo(final String serverName) {
365 HServerInfo info = this.onlineServers.remove(serverName);
366 if (info != null) {
367 return true;
368 }
369 return false;
370 }
371
372
373
374
375
376
377
378 public double getAverageLoad() {
379 int totalLoad = 0;
380 int numServers = 0;
381 double averageLoad = 0.0;
382 for (HServerInfo hsi : onlineServers.values()) {
383 numServers++;
384 totalLoad += hsi.getLoad().getNumberOfRegions();
385 }
386 averageLoad = (double)totalLoad / (double)numServers;
387 return averageLoad;
388 }
389
390
391 int countOfRegionServers() {
392
393 return this.onlineServers.size();
394 }
395
396
397
398
399
400 public HServerInfo getServerInfo(String name) {
401 return this.onlineServers.get(name);
402 }
403
404
405
406
407 public Map<String, HServerInfo> getOnlineServers() {
408
409 synchronized (this.onlineServers) {
410 return Collections.unmodifiableMap(this.onlineServers);
411 }
412 }
413
414 public Set<String> getDeadServers() {
415 return this.deadservers.clone();
416 }
417
418
419
420
421
422 public boolean areDeadServersInProgress() {
423 return this.deadservers.areDeadServersInProgress();
424 }
425
426
427
428
429
430
431 public HServerInfo getHServerInfo(final HServerAddress hsa) {
432 synchronized(this.onlineServers) {
433
434 for (Map.Entry<String, HServerInfo> e: this.onlineServers.entrySet()) {
435 if (e.getValue().getServerAddress().equals(hsa)) {
436 return e.getValue();
437 }
438 }
439 }
440 return null;
441 }
442
443 private void notifyOnlineServers() {
444 synchronized (this.onlineServers) {
445 this.onlineServers.notifyAll();
446 }
447 }
448
449
450
451
452
453
454
455 void letRegionServersShutdown() {
456 synchronized (onlineServers) {
457 while (onlineServers.size() > 0) {
458 StringBuilder sb = new StringBuilder();
459 for (String key: this.onlineServers.keySet()) {
460 if (sb.length() > 0) {
461 sb.append(", ");
462 }
463 sb.append(key);
464 }
465 LOG.info("Waiting on regionserver(s) to go down " + sb.toString());
466 try {
467 this.onlineServers.wait(1000);
468 } catch (InterruptedException e) {
469
470 }
471 }
472 }
473 }
474
475
476
477
478
479 public synchronized void expireServer(final HServerInfo hsi) {
480
481
482 String serverName = hsi.getServerName();
483 HServerInfo info = this.onlineServers.get(serverName);
484 if (info == null) {
485 LOG.warn("Received expiration of " + hsi.getServerName() +
486 " but server is not currently online");
487 return;
488 }
489 if (this.deadservers.contains(serverName)) {
490
491 LOG.warn("Received expiration of " + hsi.getServerName() +
492 " but server shutdown is already in progress");
493 return;
494 }
495
496
497
498 this.deadservers.add(serverName);
499 this.onlineServers.remove(serverName);
500 this.serverConnections.remove(serverName);
501
502
503 if (this.clusterShutdown) {
504 LOG.info("Cluster shutdown set; " + hsi.getServerName() +
505 " expired; onlineServers=" + this.onlineServers.size());
506 if (this.onlineServers.isEmpty()) {
507 master.stop("Cluster shutdown set; onlineServer=0");
508 }
509 return;
510 }
511 CatalogTracker ct = this.master.getCatalogTracker();
512
513 boolean carryingRoot;
514 try {
515 HServerAddress address = ct.getRootLocation();
516 carryingRoot = address != null &&
517 hsi.getServerAddress().equals(address);
518 } catch (InterruptedException e) {
519 Thread.currentThread().interrupt();
520 LOG.info("Interrupted");
521 return;
522 }
523
524
525
526
527
528 HServerAddress address = ct.getMetaLocation();
529 boolean carryingMeta =
530 address != null && hsi.getServerAddress().equals(address);
531 if (carryingRoot || carryingMeta) {
532 this.services.getExecutorService().submit(new MetaServerShutdownHandler(this.master,
533 this.services, this.deadservers, info, carryingRoot, carryingMeta));
534 } else {
535 this.services.getExecutorService().submit(new ServerShutdownHandler(this.master,
536 this.services, this.deadservers, info));
537 }
538 LOG.debug("Added=" + serverName +
539 " to dead servers, submitted shutdown handler to be executed, root=" +
540 carryingRoot + ", meta=" + carryingMeta);
541 }
542
543
544
545
546
547
548
549
550
551
552
553 public void sendRegionOpen(HServerInfo server, HRegionInfo region)
554 throws IOException {
555 HRegionInterface hri = getServerConnection(server);
556 if (hri == null) {
557 LOG.warn("Attempting to send OPEN RPC to server " + server.getServerName()
558 + " failed because no RPC connection found to this server");
559 return;
560 }
561 hri.openRegion(region);
562 }
563
564
565
566
567
568
569
570
571
572 public void sendRegionOpen(HServerInfo server, List<HRegionInfo> regions)
573 throws IOException {
574 HRegionInterface hri = getServerConnection(server);
575 if (hri == null) {
576 LOG.warn("Attempting to send OPEN RPC to server " + server.getServerName()
577 + " failed because no RPC connection found to this server");
578 return;
579 }
580 hri.openRegions(regions);
581 }
582
583
584
585
586
587
588
589
590
591
592
593 public boolean sendRegionClose(HServerInfo server, HRegionInfo region)
594 throws IOException {
595 if (server == null) throw new NullPointerException("Passed server is null");
596 HRegionInterface hri = getServerConnection(server);
597 if (hri == null) {
598 throw new IOException("Attempting to send CLOSE RPC to server " +
599 server.getServerName() + " for region " +
600 region.getRegionNameAsString() +
601 " failed because no RPC connection found to this server");
602 }
603 return hri.closeRegion(region);
604 }
605
606
607
608
609
610
611
612
613 private HRegionInterface getServerConnection(HServerInfo info)
614 throws IOException {
615 HConnection connection =
616 HConnectionManager.getConnection(this.master.getConfiguration());
617 HRegionInterface hri = serverConnections.get(info.getServerName());
618 if (hri == null) {
619 LOG.debug("New connection to " + info.getServerName());
620 hri = connection.getHRegionConnection(info.getServerAddress(), false);
621 this.serverConnections.put(info.getServerName(), hri);
622 }
623 return hri;
624 }
625
626
627
628
629
630
631 public int waitForRegionServers()
632 throws InterruptedException {
633 long interval = this.master.getConfiguration().
634 getLong("hbase.master.wait.on.regionservers.interval", 1500);
635 long timeout = this.master.getConfiguration().
636 getLong("hbase.master.wait.on.regionservers.timeout", 4500);
637 int minToStart = this.master.getConfiguration().
638 getInt("hbase.master.wait.on.regionservers.mintostart", 1);
639 int maxToStart = this.master.getConfiguration().
640 getInt("hbase.master.wait.on.regionservers.maxtostart", Integer.MAX_VALUE);
641
642
643 int count = 0;
644 long slept = 0;
645 for (int oldcount = countOfRegionServers(); !this.master.isStopped();) {
646 Thread.sleep(interval);
647 slept += interval;
648 count = countOfRegionServers();
649 if (count == oldcount && count >= minToStart && slept >= timeout) {
650 LOG.info("Finished waiting for regionserver count to settle; " +
651 "count=" + count + ", sleptFor=" + slept);
652 break;
653 }
654 if (count >= maxToStart) {
655 LOG.info("At least the max configured number of regionserver(s) have " +
656 "checked in: " + count);
657 break;
658 }
659 if (count == 0) {
660 LOG.info("Waiting on regionserver(s) to checkin");
661 } else {
662 LOG.info("Waiting on regionserver(s) count to settle; currently=" + count);
663 }
664 oldcount = count;
665 }
666
667
668
669
670
671 int regionCount = 0;
672 for (Map.Entry<String, HServerInfo> e: this.onlineServers.entrySet()) {
673 HServerLoad load = e.getValue().getLoad();
674 if (load != null) regionCount += load.getLoad();
675 }
676 LOG.info("Exiting wait on regionserver(s) to checkin; count=" + count +
677 ", stopped=" + this.master.isStopped() +
678 ", count of regions out on cluster=" + regionCount);
679 return regionCount;
680 }
681
682
683
684
685 public List<HServerInfo> getOnlineServersList() {
686
687 return new ArrayList<HServerInfo>(onlineServers.values());
688 }
689
690 public boolean isServerOnline(String serverName) {
691 return onlineServers.containsKey(serverName);
692 }
693
694 public void shutdownCluster() {
695 this.clusterShutdown = true;
696 this.master.stop("Cluster shutdown requested");
697 }
698
699 public boolean isClusterShutdown() {
700 return this.clusterShutdown;
701 }
702
703
704
705
706 public void stop() {
707
708 }
709 }