1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.master;
21
22 import java.io.DataInput;
23 import java.io.DataOutput;
24 import java.io.IOException;
25 import java.lang.Thread.UncaughtExceptionHandler;
26 import java.util.ArrayList;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.NavigableMap;
33 import java.util.Set;
34 import java.util.SortedMap;
35 import java.util.TreeMap;
36 import java.util.TreeSet;
37 import java.util.concurrent.ConcurrentSkipListMap;
38 import java.util.concurrent.ConcurrentSkipListSet;
39 import java.util.concurrent.atomic.AtomicInteger;
40
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.hbase.Chore;
45 import org.apache.hadoop.hbase.HConstants;
46 import org.apache.hadoop.hbase.HRegionInfo;
47 import org.apache.hadoop.hbase.HServerAddress;
48 import org.apache.hadoop.hbase.HServerInfo;
49 import org.apache.hadoop.hbase.HTableDescriptor;
50 import org.apache.hadoop.hbase.NotServingRegionException;
51 import org.apache.hadoop.hbase.Server;
52 import org.apache.hadoop.hbase.Stoppable;
53 import org.apache.hadoop.hbase.catalog.CatalogTracker;
54 import org.apache.hadoop.hbase.catalog.MetaReader;
55 import org.apache.hadoop.hbase.catalog.RootLocationEditor;
56 import org.apache.hadoop.hbase.client.Result;
57 import org.apache.hadoop.hbase.executor.EventHandler.EventType;
58 import org.apache.hadoop.hbase.executor.ExecutorService;
59 import org.apache.hadoop.hbase.executor.RegionTransitionData;
60 import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan;
61 import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
62 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
63 import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
64 import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
65 import org.apache.hadoop.hbase.util.Bytes;
66 import org.apache.hadoop.hbase.util.Pair;
67 import org.apache.hadoop.hbase.util.Threads;
68 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
69 import org.apache.hadoop.hbase.zookeeper.ZKTable;
70 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
71 import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData;
72 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
73 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
74 import org.apache.hadoop.io.Writable;
75 import org.apache.hadoop.ipc.RemoteException;
76 import org.apache.zookeeper.AsyncCallback;
77 import org.apache.zookeeper.KeeperException;
78 import org.apache.zookeeper.KeeperException.NoNodeException;
79 import org.apache.zookeeper.data.Stat;
80
81
82
83
84
85
86
87
88 public class AssignmentManager extends ZooKeeperListener {
89 private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
90
91 protected Server master;
92
93 private ServerManager serverManager;
94
95 private CatalogTracker catalogTracker;
96
97 private TimeoutMonitor timeoutMonitor;
98
99
100
101
102 private final int maximumAssignmentAttempts;
103
104
105
106
107
108 final ConcurrentSkipListMap<String, RegionState> regionsInTransition =
109 new ConcurrentSkipListMap<String, RegionState>();
110
111
112
113
114
115 final NavigableMap<String, RegionPlan> regionPlans =
116 new TreeMap<String, RegionPlan>();
117
118 private final ZKTable zkTable;
119
120
121
122
123
124
125
126
127 private final NavigableMap<HServerInfo, Set<HRegionInfo>> servers =
128 new TreeMap<HServerInfo, Set<HRegionInfo>>();
129
130
131
132
133
134
135
136
137 private final SortedMap<HRegionInfo,HServerInfo> regions =
138 new TreeMap<HRegionInfo,HServerInfo>();
139
140 private final ExecutorService executorService;
141
142
143
144
145
146
147
148
149
150
151 public AssignmentManager(Server master, ServerManager serverManager,
152 CatalogTracker catalogTracker, final ExecutorService service)
153 throws KeeperException {
154 super(master.getZooKeeper());
155 this.master = master;
156 this.serverManager = serverManager;
157 this.catalogTracker = catalogTracker;
158 this.executorService = service;
159 Configuration conf = master.getConfiguration();
160 this.timeoutMonitor = new TimeoutMonitor(
161 conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000),
162 master,
163 conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000));
164 Threads.setDaemonThreadRunning(timeoutMonitor,
165 master.getServerName() + ".timeoutMonitor");
166 this.zkTable = new ZKTable(this.master.getZooKeeper());
167 this.maximumAssignmentAttempts =
168 this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
169 }
170
171
172
173
174 public ZKTable getZKTable() {
175
176
177 return this.zkTable;
178 }
179
180
181
182
183
184
185
186 void cleanoutUnassigned() throws IOException, KeeperException {
187
188 ZKAssign.deleteAllNodes(watcher);
189 ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
190 this.watcher.assignmentZNode);
191 }
192
193
194
195
196
197
198
199
200 void processFailover() throws KeeperException, IOException, InterruptedException {
201
202
203
204
205
206
207
208
209
210
211
212 HServerInfo hsi =
213 this.serverManager.getHServerInfo(this.catalogTracker.getMetaLocation());
214 regionOnline(HRegionInfo.FIRST_META_REGIONINFO, hsi);
215 hsi = this.serverManager.getHServerInfo(this.catalogTracker.getRootLocation());
216 regionOnline(HRegionInfo.ROOT_REGIONINFO, hsi);
217
218
219
220 Map<String, List<Pair<HRegionInfo, Result>>> deadServers =
221 rebuildUserRegions();
222
223
224 processDeadServers(deadServers);
225
226 processRegionsInTransition(deadServers);
227 }
228
229
230
231
232
233
234
235
236 void processRegionsInTransition()
237 throws KeeperException, IOException, InterruptedException {
238
239 processRegionsInTransition(null);
240 }
241
242
243
244
245
246
247
248
249
250 void processRegionsInTransition(
251 final Map<String, List<Pair<HRegionInfo, Result>>> deadServers)
252 throws KeeperException, IOException, InterruptedException {
253 List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
254 watcher.assignmentZNode);
255 if (nodes.isEmpty()) {
256 LOG.info("No regions in transition in ZK to process on failover");
257 return;
258 }
259 LOG.info("Failed-over master needs to process " + nodes.size() +
260 " regions in transition");
261 for (String encodedRegionName: nodes) {
262 processRegionInTransition(encodedRegionName, null, deadServers);
263 }
264 }
265
266
267
268
269
270
271
272
273
274
275
276
277 boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
278 throws InterruptedException, KeeperException, IOException {
279 boolean intransistion =
280 processRegionInTransition(hri.getEncodedName(), hri, null);
281 if (!intransistion) return intransistion;
282 LOG.debug("Waiting on " + HRegionInfo.prettyPrint(hri.getEncodedName()));
283 synchronized(this.regionsInTransition) {
284 while (!this.master.isStopped() &&
285 this.regionsInTransition.containsKey(hri.getEncodedName())) {
286 this.regionsInTransition.wait();
287 }
288 }
289 return intransistion;
290 }
291
292
293
294
295
296
297
298
299
300
301
302 boolean processRegionInTransition(final String encodedRegionName,
303 final HRegionInfo regionInfo,
304 final Map<String, List<Pair<HRegionInfo,Result>>> deadServers)
305 throws KeeperException, IOException {
306 RegionTransitionData data = ZKAssign.getData(watcher, encodedRegionName);
307 if (data == null) return false;
308 HRegionInfo hri = regionInfo;
309 if (hri == null) {
310 Pair<HRegionInfo, HServerAddress> p =
311 MetaReader.getRegion(catalogTracker, data.getRegionName());
312 if (p == null) return false;
313 hri = p.getFirst();
314 }
315 processRegionsInTransition(data, hri, deadServers);
316 return true;
317 }
318
319 void processRegionsInTransition(final RegionTransitionData data,
320 final HRegionInfo regionInfo,
321 final Map<String, List<Pair<HRegionInfo,Result>>> deadServers)
322 throws KeeperException {
323 String encodedRegionName = regionInfo.getEncodedName();
324 LOG.info("Processing region " + regionInfo.getRegionNameAsString() +
325 " in state " + data.getEventType());
326 synchronized (regionsInTransition) {
327 switch (data.getEventType()) {
328 case RS_ZK_REGION_CLOSING:
329 if (isOnDeadServer(regionInfo, deadServers)) {
330
331
332 forceOffline(regionInfo, data);
333 } else {
334
335
336 regionsInTransition.put(encodedRegionName, new RegionState(
337 regionInfo, RegionState.State.CLOSING, data.getStamp()));
338 }
339 break;
340
341 case RS_ZK_REGION_CLOSED:
342
343 addToRITandCallClose(regionInfo, RegionState.State.CLOSED, data);
344 break;
345
346 case M_ZK_REGION_OFFLINE:
347
348 addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data);
349 break;
350
351 case RS_ZK_REGION_OPENING:
352
353
354
355
356
357 regionsInTransition.put(encodedRegionName, new RegionState(
358 regionInfo, RegionState.State.OPENING, data.getStamp()));
359 break;
360
361 case RS_ZK_REGION_OPENED:
362
363 regionsInTransition.put(encodedRegionName, new RegionState(
364 regionInfo, RegionState.State.OPEN, data.getStamp()));
365 String sn = data.getServerName();
366
367
368
369 if (sn == null) {
370 LOG.warn("Region in transition " + regionInfo.getEncodedName() +
371 " references a server no longer up " + data.getServerName() +
372 "; letting RIT timeout so will be assigned elsewhere");
373 break;
374 }
375 if (isOnDeadServer(regionInfo, deadServers)) {
376
377 forceOffline(regionInfo, data);
378 } else {
379 HServerInfo hsi = this.serverManager.getServerInfo(sn);
380 if (hsi == null) {
381 LOG.info("Failed to find " + sn +
382 " in list of online servers; skipping registration of open of " +
383 regionInfo.getRegionNameAsString());
384 } else {
385 new OpenedRegionHandler(master, this, regionInfo, hsi).process();
386 }
387 }
388 break;
389 }
390 }
391 }
392
393
394
395
396
397
398
399 private void forceOffline(final HRegionInfo hri,
400 final RegionTransitionData oldData)
401 throws KeeperException {
402
403
404 LOG.debug("RIT " + hri.getEncodedName() + " in state=" +
405 oldData.getEventType() + " was on deadserver; forcing offline");
406 ZKAssign.createOrForceNodeOffline(this.watcher, hri,
407 this.master.getServerName());
408 addToRITandCallClose(hri, RegionState.State.OFFLINE, oldData);
409 }
410
411
412
413
414
415
416
417
418 private void addToRITandCallClose(final HRegionInfo hri,
419 final RegionState.State state, final RegionTransitionData oldData) {
420 this.regionsInTransition.put(hri.getEncodedName(),
421 new RegionState(hri, state, oldData.getStamp()));
422 new ClosedRegionHandler(this.master, this, hri).process();
423 }
424
425
426
427
428
429
430
431 private boolean isOnDeadServer(final HRegionInfo regionInfo,
432 final Map<String, List<Pair<HRegionInfo, Result>>> deadServers) {
433 if (deadServers == null) return false;
434 for (Map.Entry<String, List<Pair<HRegionInfo, Result>>> deadServer:
435 deadServers.entrySet()) {
436 for (Pair<HRegionInfo, Result> e: deadServer.getValue()) {
437 if (e.getFirst().equals(regionInfo)) return true;
438 }
439 }
440 return false;
441 }
442
443
444
445
446
447
448
449
450
451
452 private void handleRegion(final RegionTransitionData data) {
453 synchronized(regionsInTransition) {
454 if (data == null || data.getServerName() == null) {
455 LOG.warn("Unexpected NULL input " + data);
456 return;
457 }
458
459 if (data.getServerName().equals(HConstants.HBCK_CODE_NAME)) {
460 handleHBCK(data);
461 return;
462 }
463
464 if (!serverManager.isServerOnline(data.getServerName()) &&
465 !this.master.getServerName().equals(data.getServerName())) {
466 LOG.warn("Attempted to handle region transition for server but " +
467 "server is not online: " + Bytes.toString(data.getRegionName()));
468 return;
469 }
470 String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
471 String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
472
473 boolean lateEvent = data.getStamp() <
474 (System.currentTimeMillis() - 15000);
475 LOG.debug("Handling transition=" + data.getEventType() +
476 ", server=" + data.getServerName() + ", region=" +
477 prettyPrintedRegionName +
478 (lateEvent? ", which is more than 15 seconds late" : ""));
479 RegionState regionState = regionsInTransition.get(encodedName);
480 switch (data.getEventType()) {
481 case M_ZK_REGION_OFFLINE:
482
483 break;
484
485 case RS_ZK_REGION_CLOSING:
486
487
488 if (regionState == null ||
489 (!regionState.isPendingClose() && !regionState.isClosing())) {
490 LOG.warn("Received CLOSING for region " + prettyPrintedRegionName +
491 " from server " + data.getServerName() + " but region was in " +
492 " the state " + regionState + " and not " +
493 "in expected PENDING_CLOSE or CLOSING states");
494 return;
495 }
496
497 regionState.update(RegionState.State.CLOSING, data.getStamp());
498 break;
499
500 case RS_ZK_REGION_CLOSED:
501
502 if (regionState == null ||
503 (!regionState.isPendingClose() && !regionState.isClosing())) {
504 LOG.warn("Received CLOSED for region " + prettyPrintedRegionName +
505 " from server " + data.getServerName() + " but region was in " +
506 " the state " + regionState + " and not " +
507 "in expected PENDING_CLOSE or CLOSING states");
508 return;
509 }
510
511
512
513 regionState.update(RegionState.State.CLOSED, data.getStamp());
514 this.executorService.submit(new ClosedRegionHandler(master,
515 this, regionState.getRegion()));
516 break;
517
518 case RS_ZK_REGION_OPENING:
519
520
521 if (regionState == null ||
522 (!regionState.isPendingOpen() && !regionState.isOpening())) {
523 LOG.warn("Received OPENING for region " +
524 prettyPrintedRegionName +
525 " from server " + data.getServerName() + " but region was in " +
526 " the state " + regionState + " and not " +
527 "in expected PENDING_OPEN or OPENING states");
528 return;
529 }
530
531 regionState.update(RegionState.State.OPENING, data.getStamp());
532 break;
533
534 case RS_ZK_REGION_OPENED:
535
536 if (regionState == null ||
537 (!regionState.isPendingOpen() && !regionState.isOpening())) {
538 LOG.warn("Received OPENED for region " +
539 prettyPrintedRegionName +
540 " from server " + data.getServerName() + " but region was in " +
541 " the state " + regionState + " and not " +
542 "in expected PENDING_OPEN or OPENING states");
543 return;
544 }
545
546 regionState.update(RegionState.State.OPEN, data.getStamp());
547 this.executorService.submit(
548 new OpenedRegionHandler(master, this, regionState.getRegion(),
549 this.serverManager.getServerInfo(data.getServerName())));
550 break;
551 }
552 }
553 }
554
555
556
557
558
559
560
561 private void handleHBCK(RegionTransitionData data) {
562 String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
563 LOG.info("Handling HBCK triggered transition=" + data.getEventType() +
564 ", server=" + data.getServerName() + ", region=" +
565 HRegionInfo.prettyPrint(encodedName));
566 RegionState regionState = regionsInTransition.get(encodedName);
567 switch (data.getEventType()) {
568 case M_ZK_REGION_OFFLINE:
569 HRegionInfo regionInfo = null;
570 if (regionState != null) {
571 regionInfo = regionState.getRegion();
572 } else {
573 try {
574 regionInfo = MetaReader.getRegion(catalogTracker,
575 data.getRegionName()).getFirst();
576 } catch (IOException e) {
577 LOG.info("Exception reading META doing HBCK repair operation", e);
578 return;
579 }
580 }
581 LOG.info("HBCK repair is triggering assignment of region=" +
582 regionInfo.getRegionNameAsString());
583
584 assign(regionInfo, false);
585 break;
586
587 default:
588 LOG.warn("Received unexpected region state from HBCK (" +
589 data.getEventType() + ")");
590 break;
591 }
592 }
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608 @Override
609 public void nodeCreated(String path) {
610 if(path.startsWith(watcher.assignmentZNode)) {
611 synchronized(regionsInTransition) {
612 try {
613 RegionTransitionData data = ZKAssign.getData(watcher, path);
614 if(data == null) {
615 return;
616 }
617 handleRegion(data);
618 } catch (KeeperException e) {
619 master.abort("Unexpected ZK exception reading unassigned node data", e);
620 }
621 }
622 }
623 }
624
625
626
627
628
629
630
631
632
633
634
635
636
637 @Override
638 public void nodeDataChanged(String path) {
639 if(path.startsWith(watcher.assignmentZNode)) {
640 synchronized(regionsInTransition) {
641 try {
642 RegionTransitionData data = ZKAssign.getData(watcher, path);
643 if(data == null) {
644 return;
645 }
646 handleRegion(data);
647 } catch (KeeperException e) {
648 master.abort("Unexpected ZK exception reading unassigned node data", e);
649 }
650 }
651 }
652 }
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667 @Override
668 public void nodeChildrenChanged(String path) {
669 if(path.equals(watcher.assignmentZNode)) {
670 synchronized(regionsInTransition) {
671 try {
672 List<NodeAndData> newNodes = ZKUtil.watchAndGetNewChildren(watcher,
673 watcher.assignmentZNode);
674 for(NodeAndData newNode : newNodes) {
675 LOG.debug("Handling new unassigned node: " + newNode);
676 handleRegion(RegionTransitionData.fromBytes(newNode.getData()));
677 }
678 } catch(KeeperException e) {
679 master.abort("Unexpected ZK exception reading unassigned children", e);
680 }
681 }
682 }
683 }
684
685
686
687
688
689
690
691
692
693 public void regionOnline(HRegionInfo regionInfo, HServerInfo serverInfo) {
694 synchronized (this.regionsInTransition) {
695 RegionState rs =
696 this.regionsInTransition.remove(regionInfo.getEncodedName());
697 if (rs != null) {
698 this.regionsInTransition.notifyAll();
699 }
700 }
701 synchronized (this.regions) {
702
703 HServerInfo hsi = this.regions.get(regionInfo);
704 if (hsi != null) LOG.warn("Overwriting " + regionInfo.getEncodedName() +
705 " on " + hsi);
706
707 HServerInfo hsiWithoutLoad = new HServerInfo(
708 serverInfo.getServerAddress(), serverInfo.getStartCode(),
709 serverInfo.getInfoPort(), serverInfo.getHostname());
710
711 if (isServerOnline(hsiWithoutLoad.getServerName())) {
712 this.regions.put(regionInfo, hsiWithoutLoad);
713 addToServers(hsiWithoutLoad, regionInfo);
714 this.regions.notifyAll();
715 } else {
716 LOG.info("The server is not in online servers, ServerName=" +
717 hsiWithoutLoad.getServerName() + ", region=" +
718 regionInfo.getEncodedName());
719 }
720 }
721
722 clearRegionPlan(regionInfo);
723
724 updateTimers(serverInfo);
725 }
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740 private void updateTimers(final HServerInfo hsi) {
741
742
743
744
745 Map<String, RegionPlan> copy = new HashMap<String, RegionPlan>();
746 synchronized(this.regionPlans) {
747 copy.putAll(this.regionPlans);
748 }
749 for (Map.Entry<String, RegionPlan> e: copy.entrySet()) {
750 if (!e.getValue().getDestination().equals(hsi)) continue;
751 RegionState rs = null;
752 synchronized (this.regionsInTransition) {
753 rs = this.regionsInTransition.get(e.getKey());
754 }
755 if (rs == null) continue;
756 synchronized (rs) {
757 rs.update(rs.getState());
758 }
759 }
760 }
761
762
763
764
765
766
767
768
769 public void regionOffline(final HRegionInfo regionInfo) {
770 synchronized(this.regionsInTransition) {
771 if (this.regionsInTransition.remove(regionInfo.getEncodedName()) != null) {
772 this.regionsInTransition.notifyAll();
773 }
774 }
775
776 clearRegionPlan(regionInfo);
777 setOffline(regionInfo);
778 }
779
780
781
782
783
784
785
786
787 public void setOffline(HRegionInfo regionInfo) {
788 synchronized (this.regions) {
789 HServerInfo serverInfo = this.regions.remove(regionInfo);
790 if (serverInfo == null) return;
791 Set<HRegionInfo> serverRegions = this.servers.get(serverInfo);
792 if (!serverRegions.remove(regionInfo)) {
793 LOG.warn("No " + regionInfo + " on " + serverInfo);
794 }
795 }
796 }
797
798 public void offlineDisabledRegion(HRegionInfo regionInfo) {
799
800 LOG.debug("Table being disabled so deleting ZK node and removing from " +
801 "regions in transition, skipping assignment of region " +
802 regionInfo.getRegionNameAsString());
803 try {
804 if (!ZKAssign.deleteClosedNode(watcher, regionInfo.getEncodedName())) {
805
806 ZKAssign.deleteOfflineNode(watcher, regionInfo.getEncodedName());
807 }
808 } catch (KeeperException.NoNodeException nne) {
809 LOG.debug("Tried to delete closed node for " + regionInfo + " but it " +
810 "does not exist so just offlining");
811 } catch (KeeperException e) {
812 this.master.abort("Error deleting CLOSED node in ZK", e);
813 }
814 regionOffline(regionInfo);
815 }
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837 public void assign(HRegionInfo region, boolean setOfflineInZK) {
838 assign(region, setOfflineInZK, false);
839 }
840
841 public void assign(HRegionInfo region, boolean setOfflineInZK,
842 boolean forceNewPlan) {
843 String tableName = region.getTableDesc().getNameAsString();
844 boolean disabled = this.zkTable.isDisabledTable(tableName);
845 if (disabled || this.zkTable.isDisablingTable(tableName)) {
846 LOG.info("Table " + tableName + (disabled? " disabled;": " disabling;") +
847 " skipping assign of " + region.getRegionNameAsString());
848 offlineDisabledRegion(region);
849 return;
850 }
851 if (this.serverManager.isClusterShutdown()) {
852 LOG.info("Cluster shutdown is set; skipping assign of " +
853 region.getRegionNameAsString());
854 return;
855 }
856 RegionState state = addToRegionsInTransition(region);
857 synchronized (state) {
858 assign(state, setOfflineInZK, forceNewPlan);
859 }
860 }
861
862
863
864
865
866
867 void assign(final HServerInfo destination,
868 final List<HRegionInfo> regions) {
869 LOG.debug("Bulk assigning " + regions.size() + " region(s) to " +
870 destination.getServerName());
871
872 List<RegionState> states = new ArrayList<RegionState>(regions.size());
873 synchronized (this.regionsInTransition) {
874 for (HRegionInfo region: regions) {
875 states.add(forceRegionStateToOffline(region));
876 }
877 }
878
879
880 AtomicInteger counter = new AtomicInteger(0);
881 CreateUnassignedAsyncCallback cb =
882 new CreateUnassignedAsyncCallback(this.watcher, destination, counter);
883 for (RegionState state: states) {
884 if (!asyncSetOfflineInZooKeeper(state, cb, state)) {
885 return;
886 }
887 }
888
889 int total = regions.size();
890 for (int oldCounter = 0; true;) {
891 int count = counter.get();
892 if (oldCounter != count) {
893 LOG.info(destination.getServerName() + " unassigned znodes=" + count +
894 " of total=" + total);
895 oldCounter = count;
896 }
897 if (count == total) break;
898 Threads.sleep(1);
899 }
900 try {
901 long maxWaitTime = System.currentTimeMillis() +
902 this.master.getConfiguration().
903 getLong("hbase.regionserver.rpc.startup.waittime", 60000);
904 while (!this.master.isStopped()) {
905 try {
906 this.serverManager.sendRegionOpen(destination, regions);
907 break;
908 } catch (org.apache.hadoop.hbase.ipc.ServerNotRunningException e) {
909
910
911 long now = System.currentTimeMillis();
912 if (now > maxWaitTime) throw e;
913 LOG.debug("Server is not yet up; waiting up to " +
914 (maxWaitTime - now) + "ms", e);
915 Thread.sleep(1000);
916 }
917 }
918 } catch (IOException e) {
919 throw new RuntimeException(e);
920 } catch (InterruptedException e) {
921 throw new RuntimeException(e);
922 }
923 LOG.debug("Bulk assigning done for " + destination.getServerName());
924 }
925
926
927
928
929 static class CreateUnassignedAsyncCallback implements AsyncCallback.StringCallback {
930 private final Log LOG = LogFactory.getLog(CreateUnassignedAsyncCallback.class);
931 private final ZooKeeperWatcher zkw;
932 private final HServerInfo destination;
933 private final AtomicInteger counter;
934
935 CreateUnassignedAsyncCallback(final ZooKeeperWatcher zkw,
936 final HServerInfo destination, final AtomicInteger counter) {
937 this.zkw = zkw;
938 this.destination = destination;
939 this.counter = counter;
940 }
941
942 @Override
943 public void processResult(int rc, String path, Object ctx, String name) {
944 if (rc != 0) {
945
946 LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
947 "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
948 this.zkw.abort("Connectionloss writing unassigned at " + path +
949 ", rc=" + rc, null);
950 return;
951 }
952 LOG.debug("rs=" + (RegionState)ctx + ", server=" + this.destination.getServerName());
953
954
955 this.zkw.getZooKeeper().exists(path, this.zkw,
956 new ExistsUnassignedAsyncCallback(this.counter), ctx);
957 }
958 }
959
960
961
962
963
964 static class ExistsUnassignedAsyncCallback implements AsyncCallback.StatCallback {
965 private final Log LOG = LogFactory.getLog(ExistsUnassignedAsyncCallback.class);
966 private final AtomicInteger counter;
967
968 ExistsUnassignedAsyncCallback(final AtomicInteger counter) {
969 this.counter = counter;
970 }
971
972 @Override
973 public void processResult(int rc, String path, Object ctx, Stat stat) {
974 if (rc != 0) {
975
976 LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
977 "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
978 return;
979 }
980 RegionState state = (RegionState)ctx;
981 LOG.debug("rs=" + state);
982
983
984
985
986
987 state.update(RegionState.State.PENDING_OPEN);
988 this.counter.addAndGet(1);
989 }
990 }
991
992
993
994
995
996 private RegionState addToRegionsInTransition(final HRegionInfo region) {
997 synchronized (regionsInTransition) {
998 return forceRegionStateToOffline(region);
999 }
1000 }
1001
1002
1003
1004
1005
1006
1007
1008 private RegionState forceRegionStateToOffline(final HRegionInfo region) {
1009 String encodedName = region.getEncodedName();
1010 RegionState state = this.regionsInTransition.get(encodedName);
1011 if (state == null) {
1012 state = new RegionState(region, RegionState.State.OFFLINE);
1013 this.regionsInTransition.put(encodedName, state);
1014 } else {
1015 LOG.debug("Forcing OFFLINE; was=" + state);
1016 state.update(RegionState.State.OFFLINE);
1017 }
1018 return state;
1019 }
1020
1021
1022
1023
1024
1025
1026
1027 private void assign(final RegionState state, final boolean setOfflineInZK,
1028 final boolean forceNewPlan) {
1029 for (int i = 0; i < this.maximumAssignmentAttempts; i++) {
1030 if (setOfflineInZK && !setOfflineInZooKeeper(state)) return;
1031 if (this.master.isStopped()) {
1032 LOG.debug("Server stopped; skipping assign of " + state);
1033 return;
1034 }
1035 RegionPlan plan = getRegionPlan(state, forceNewPlan);
1036 if (plan == null) return;
1037 try {
1038 LOG.debug("Assigning region " + state.getRegion().getRegionNameAsString() +
1039 " to " + plan.getDestination().getServerName());
1040
1041 state.update(RegionState.State.PENDING_OPEN);
1042
1043 serverManager.sendRegionOpen(plan.getDestination(), state.getRegion());
1044 break;
1045 } catch (Throwable t) {
1046 LOG.warn("Failed assignment of " +
1047 state.getRegion().getRegionNameAsString() + " to " +
1048 plan.getDestination() + ", trying to assign elsewhere instead; " +
1049 "retry=" + i, t);
1050
1051
1052
1053 state.update(RegionState.State.OFFLINE);
1054
1055
1056 if (getRegionPlan(state, plan.getDestination(), true) == null) {
1057 LOG.warn("Unable to find a viable location to assign region " +
1058 state.getRegion().getRegionNameAsString());
1059 return;
1060 }
1061 }
1062 }
1063 }
1064
1065
1066
1067
1068
1069
1070
1071 boolean setOfflineInZooKeeper(final RegionState state) {
1072 if (!state.isClosed() && !state.isOffline()) {
1073 new RuntimeException("Unexpected state trying to OFFLINE; " + state);
1074 this.master.abort("Unexpected state trying to OFFLINE; " + state,
1075 new IllegalStateException());
1076 return false;
1077 }
1078 state.update(RegionState.State.OFFLINE);
1079 try {
1080 if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(),
1081 state.getRegion(), master.getServerName())) {
1082 LOG.warn("Attempted to create/force node into OFFLINE state before " +
1083 "completing assignment but failed to do so for " + state);
1084 return false;
1085 }
1086 } catch (KeeperException e) {
1087 master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
1088 return false;
1089 }
1090 return true;
1091 }
1092
1093
1094
1095
1096
1097
1098
1099 boolean asyncSetOfflineInZooKeeper(final RegionState state,
1100 final AsyncCallback.StringCallback cb, final Object ctx) {
1101 if (!state.isClosed() && !state.isOffline()) {
1102 new RuntimeException("Unexpected state trying to OFFLINE; " + state);
1103 this.master.abort("Unexpected state trying to OFFLINE; " + state,
1104 new IllegalStateException());
1105 return false;
1106 }
1107 state.update(RegionState.State.OFFLINE);
1108 try {
1109 ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
1110 master.getServerName(), cb, ctx);
1111 } catch (KeeperException e) {
1112 master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
1113 return false;
1114 }
1115 return true;
1116 }
1117
1118
1119
1120
1121
1122
1123 RegionPlan getRegionPlan(final RegionState state,
1124 final boolean forceNewPlan) {
1125 return getRegionPlan(state, null, forceNewPlan);
1126 }
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137 RegionPlan getRegionPlan(final RegionState state,
1138 final HServerInfo serverToExclude, final boolean forceNewPlan) {
1139
1140 String encodedName = state.getRegion().getEncodedName();
1141 List<HServerInfo> servers = this.serverManager.getOnlineServersList();
1142
1143
1144 if (serverToExclude != null) servers.remove(serverToExclude);
1145 if (servers.isEmpty()) return null;
1146 RegionPlan randomPlan = new RegionPlan(state.getRegion(), null,
1147 LoadBalancer.randomAssignment(servers));
1148 boolean newPlan = false;
1149 RegionPlan existingPlan = null;
1150 synchronized (this.regionPlans) {
1151 existingPlan = this.regionPlans.get(encodedName);
1152 if (forceNewPlan || existingPlan == null
1153 || existingPlan.getDestination() == null
1154 || existingPlan.getDestination().equals(serverToExclude)) {
1155 newPlan = true;
1156 this.regionPlans.put(encodedName, randomPlan);
1157 }
1158 }
1159 if (newPlan) {
1160 LOG.debug("No previous transition plan was found (or we are ignoring " +
1161 "an existing plan) for " + state.getRegion().getRegionNameAsString() +
1162 " so generated a random one; " + randomPlan + "; " +
1163 serverManager.countOfRegionServers() +
1164 " (online=" + serverManager.getOnlineServers().size() +
1165 ", exclude=" + serverToExclude + ") available servers");
1166 return randomPlan;
1167 }
1168 LOG.debug("Using pre-existing plan for region " +
1169 state.getRegion().getRegionNameAsString() + "; plan=" + existingPlan);
1170 return existingPlan;
1171 }
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182 public void unassign(HRegionInfo region) {
1183 unassign(region, false);
1184 }
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196 public void unassign(HRegionInfo region, boolean force) {
1197 LOG.debug("Starting unassignment of region " +
1198 region.getRegionNameAsString() + " (offlining)");
1199 synchronized (this.regions) {
1200
1201 if (!regions.containsKey(region)) {
1202 LOG.debug("Attempted to unassign region " +
1203 region.getRegionNameAsString() + " but it is not " +
1204 "currently assigned anywhere");
1205 return;
1206 }
1207 }
1208 String encodedName = region.getEncodedName();
1209
1210 RegionState state;
1211 synchronized (regionsInTransition) {
1212 state = regionsInTransition.get(encodedName);
1213 if (state == null) {
1214 state = new RegionState(region, RegionState.State.PENDING_CLOSE);
1215 regionsInTransition.put(encodedName, state);
1216 } else if (force && state.isPendingClose()) {
1217 LOG.debug("Attempting to unassign region " +
1218 region.getRegionNameAsString() + " which is already pending close "
1219 + "but forcing an additional close");
1220 state.update(RegionState.State.PENDING_CLOSE);
1221 } else {
1222 LOG.debug("Attempting to unassign region " +
1223 region.getRegionNameAsString() + " but it is " +
1224 "already in transition (" + state.getState() + ")");
1225 return;
1226 }
1227 }
1228
1229 HServerInfo server = null;
1230 synchronized (this.regions) {
1231 server = regions.get(region);
1232 }
1233 try {
1234 if (serverManager.sendRegionClose(server, state.getRegion())) {
1235 LOG.debug("Sent CLOSE to " + server + " for region " +
1236 region.getRegionNameAsString());
1237 return;
1238 }
1239
1240 LOG.debug("Server " + server + " region CLOSE RPC returned false for " +
1241 region.getEncodedName());
1242 } catch (NotServingRegionException nsre) {
1243 LOG.info("Server " + server + " returned " + nsre + " for " +
1244 region.getEncodedName());
1245
1246
1247
1248 return;
1249 } catch (Throwable t) {
1250 if (t instanceof RemoteException) {
1251 t = ((RemoteException)t).unwrapRemoteException();
1252 if (t instanceof NotServingRegionException) {
1253 if (checkIfRegionBelongsToDisabling(region)) {
1254
1255 LOG.info("While trying to recover the table "
1256 + region.getTableDesc().getNameAsString()
1257 + " to DISABLED state the region " + region
1258 + " was offlined but the table was in DISABLING state");
1259 synchronized (this.regionsInTransition) {
1260 this.regionsInTransition.remove(region.getEncodedName());
1261 }
1262
1263 synchronized (this.regions) {
1264 this.regions.remove(region);
1265 }
1266 }
1267 }
1268 }
1269 LOG.info("Server " + server + " returned " + t + " for " +
1270 region.getEncodedName());
1271
1272 }
1273 }
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283 public void waitForAssignment(HRegionInfo regionInfo)
1284 throws InterruptedException {
1285 synchronized(regions) {
1286 while(!regions.containsKey(regionInfo)) {
1287 regions.wait();
1288 }
1289 }
1290 }
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302 public void assignRoot() throws KeeperException {
1303 RootLocationEditor.deleteRootLocation(this.master.getZooKeeper());
1304 assign(HRegionInfo.ROOT_REGIONINFO, true);
1305 }
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315 public void assignMeta() {
1316
1317 assign(HRegionInfo.FIRST_META_REGIONINFO, true);
1318 }
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329 public void assignAllUserRegions() throws IOException, InterruptedException {
1330
1331 List<HServerInfo> servers = serverManager.getOnlineServersList();
1332
1333
1334 Map<HRegionInfo,HServerAddress> allRegions =
1335 MetaReader.fullScan(catalogTracker, this.zkTable.getDisabledTables(), true);
1336 if (allRegions == null || allRegions.isEmpty()) return;
1337
1338
1339 boolean retainAssignment = master.getConfiguration().
1340 getBoolean("hbase.master.startup.retainassign", true);
1341
1342 Map<HServerInfo, List<HRegionInfo>> bulkPlan = null;
1343 if (retainAssignment) {
1344
1345 bulkPlan = LoadBalancer.retainAssignment(allRegions, servers);
1346 } else {
1347
1348 bulkPlan = LoadBalancer.roundRobinAssignment(new ArrayList<HRegionInfo>(allRegions.keySet()), servers);
1349 }
1350 LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
1351 servers.size() + " server(s), retainAssignment=" + retainAssignment);
1352
1353
1354 BulkAssigner ba = new StartupBulkAssigner(this.master, bulkPlan, this);
1355 ba.bulkAssign();
1356 LOG.info("Bulk assigning done");
1357 }
1358
1359
1360
1361
1362
1363
1364
1365 static class StartupBulkAssigner extends BulkAssigner {
1366 final Map<HServerInfo, List<HRegionInfo>> bulkPlan;
1367 final AssignmentManager assignmentManager;
1368
1369 StartupBulkAssigner(final Server server,
1370 final Map<HServerInfo, List<HRegionInfo>> bulkPlan,
1371 final AssignmentManager am) {
1372 super(server);
1373 this.bulkPlan = bulkPlan;
1374 this.assignmentManager = am;
1375 }
1376
1377 @Override
1378 public boolean bulkAssign(boolean sync) throws InterruptedException {
1379
1380 this.assignmentManager.timeoutMonitor.bulkAssign(true);
1381 try {
1382 return super.bulkAssign(sync);
1383 } finally {
1384
1385 this.assignmentManager.timeoutMonitor.bulkAssign(false);
1386 }
1387 }
1388
1389 @Override
1390 protected String getThreadNamePrefix() {
1391 return this.server.getServerName() + "-StartupBulkAssigner";
1392 }
1393
1394 @Override
1395 protected void populatePool(java.util.concurrent.ExecutorService pool) {
1396 for (Map.Entry<HServerInfo, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
1397 pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
1398 this.assignmentManager, true));
1399 }
1400 }
1401
1402 protected boolean waitUntilDone(final long timeout)
1403 throws InterruptedException {
1404 Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
1405 for (List<HRegionInfo> regionList : bulkPlan.values()) {
1406 regionSet.addAll(regionList);
1407 }
1408 return this.assignmentManager.waitUntilNoRegionsInTransition(timeout, regionSet);
1409 }
1410
1411 @Override
1412 protected long getTimeoutOnRIT() {
1413
1414
1415 long perRegionOpenTimeGuesstimate =
1416 this.server.getConfiguration().getLong("hbase.bulk.assignment.perregion.open.time", 1000);
1417 int regionsPerServer =
1418 this.bulkPlan.entrySet().iterator().next().getValue().size();
1419 long timeout = perRegionOpenTimeGuesstimate * regionsPerServer;
1420 LOG.debug("Timeout-on-RIT=" + timeout);
1421 return timeout;
1422 }
1423 }
1424
1425
1426
1427
1428
1429 static class GeneralBulkAssigner extends StartupBulkAssigner {
1430 GeneralBulkAssigner(final Server server,
1431 final Map<HServerInfo, List<HRegionInfo>> bulkPlan,
1432 final AssignmentManager am) {
1433 super(server, bulkPlan, am);
1434 }
1435
1436 @Override
1437 protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
1438 return new UncaughtExceptionHandler() {
1439 @Override
1440 public void uncaughtException(Thread t, Throwable e) {
1441 LOG.warn("Assigning regions in " + t.getName(), e);
1442 }
1443 };
1444 }
1445 }
1446
1447
1448
1449
1450 static class SingleServerBulkAssigner implements Runnable {
1451 private final HServerInfo regionserver;
1452 private final List<HRegionInfo> regions;
1453 private final AssignmentManager assignmentManager;
1454
1455 SingleServerBulkAssigner(final HServerInfo regionserver,
1456 final List<HRegionInfo> regions, final AssignmentManager am,
1457 final boolean startUp) {
1458 this.regionserver = regionserver;
1459 this.regions = regions;
1460 this.assignmentManager = am;
1461 }
1462 @Override
1463 public void run() {
1464 this.assignmentManager.assign(this.regionserver, this.regions);
1465 }
1466 }
1467
1468
1469
1470
1471
1472
1473
1474 boolean waitUntilNoRegionsInTransition(final long timeout)
1475 throws InterruptedException {
1476
1477
1478
1479
1480
1481
1482 long startTime = System.currentTimeMillis();
1483 long remaining = timeout;
1484 synchronized (regionsInTransition) {
1485 while (regionsInTransition.size() > 0 && !this.master.isStopped()
1486 && remaining > 0) {
1487 regionsInTransition.wait(remaining);
1488 remaining = timeout - (System.currentTimeMillis() - startTime);
1489 }
1490 }
1491 return regionsInTransition.isEmpty();
1492 }
1493
1494
1495
1496
1497
1498
1499
1500
1501 boolean waitUntilNoRegionsInTransition(final long timeout, Set<HRegionInfo> regions)
1502 throws InterruptedException {
1503
1504 long startTime = System.currentTimeMillis();
1505 long remaining = timeout;
1506 boolean stillInTransition = true;
1507 synchronized (regionsInTransition) {
1508 while (regionsInTransition.size() > 0 && !this.master.isStopped() &&
1509 remaining > 0 && stillInTransition) {
1510 int count = 0;
1511 for (RegionState rs : regionsInTransition.values()) {
1512 if (regions.contains(rs.getRegion())) {
1513 count++;
1514 break;
1515 }
1516 }
1517 if (count == 0) {
1518 stillInTransition = false;
1519 break;
1520 }
1521 regionsInTransition.wait(remaining);
1522 remaining = timeout - (System.currentTimeMillis() - startTime);
1523 }
1524 }
1525 return stillInTransition;
1526 }
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538 private Map<String, List<Pair<HRegionInfo,Result>>> rebuildUserRegions()
1539 throws IOException, KeeperException {
1540
1541 List<Result> results = MetaReader.fullScanOfResults(catalogTracker);
1542
1543 Map<String, List<Pair<HRegionInfo,Result>>> offlineServers =
1544 new TreeMap<String, List<Pair<HRegionInfo,Result>>>();
1545
1546 Set<String> disablingTables = new HashSet<String>(1);
1547
1548 for (Result result : results) {
1549 Pair<HRegionInfo,HServerInfo> region =
1550 MetaReader.metaRowToRegionPairWithInfo(result);
1551 if (region == null) continue;
1552 HServerInfo regionLocation = region.getSecond();
1553 HRegionInfo regionInfo = region.getFirst();
1554 String disablingTableName = regionInfo.getTableDesc().getNameAsString();
1555 if (regionLocation == null) {
1556
1557
1558
1559 if (false == checkIfRegionBelongsToDisabled(regionInfo)) {
1560 this.regions.put(regionInfo, null);
1561 }
1562 if (checkIfRegionBelongsToDisabling(regionInfo)) {
1563 disablingTables.add(disablingTableName);
1564 }
1565 } else if (!serverManager.isServerOnline(regionLocation.getServerName())) {
1566
1567 List<Pair<HRegionInfo,Result>> offlineRegions =
1568 offlineServers.get(regionLocation.getServerName());
1569 if (offlineRegions == null) {
1570 offlineRegions = new ArrayList<Pair<HRegionInfo,Result>>(1);
1571 offlineServers.put(regionLocation.getServerName(), offlineRegions);
1572 }
1573 offlineRegions.add(new Pair<HRegionInfo,Result>(regionInfo, result));
1574 } else {
1575
1576
1577 if (false == checkIfRegionBelongsToDisabled(regionInfo)) {
1578 regions.put(regionInfo, regionLocation);
1579 addToServers(regionLocation, regionInfo);
1580 }
1581 if (checkIfRegionBelongsToDisabling(regionInfo)) {
1582 disablingTables.add(disablingTableName);
1583 }
1584 }
1585 }
1586
1587
1588
1589 if (disablingTables.size() != 0) {
1590
1591 ZKUtil.listChildrenAndWatchForNewChildren(watcher,
1592 watcher.assignmentZNode);
1593 for (String tableName : disablingTables) {
1594
1595 LOG.info("The table " + tableName
1596 + " is in DISABLING state. Hence recovering by moving the table"
1597 + " to DISABLED state.");
1598 new DisableTableHandler(this.master, tableName.getBytes(),
1599 catalogTracker, this).process();
1600 }
1601 }
1602 return offlineServers;
1603 }
1604
1605 private boolean checkIfRegionBelongsToDisabled(HRegionInfo regionInfo) {
1606 String tableName = regionInfo.getTableDesc().getNameAsString();
1607 return getZKTable().isDisabledTable(tableName);
1608 }
1609
1610 private boolean checkIfRegionBelongsToDisabling(HRegionInfo regionInfo) {
1611 String tableName = regionInfo.getTableDesc().getNameAsString();
1612 return getZKTable().isDisablingTable(tableName);
1613 }
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628 private void processDeadServers(
1629 Map<String, List<Pair<HRegionInfo, Result>>> deadServers)
1630 throws IOException, KeeperException {
1631 for (Map.Entry<String, List<Pair<HRegionInfo,Result>>> deadServer :
1632 deadServers.entrySet()) {
1633 List<Pair<HRegionInfo,Result>> regions = deadServer.getValue();
1634 for (Pair<HRegionInfo,Result> region : regions) {
1635 HRegionInfo regionInfo = region.getFirst();
1636 Result result = region.getSecond();
1637
1638 try {
1639
1640 boolean assign =
1641 ServerShutdownHandler.processDeadRegion(regionInfo, result, this,
1642 this.catalogTracker);
1643 if (assign) {
1644 ZKAssign.createOrForceNodeOffline(watcher, regionInfo,
1645 master.getServerName());
1646 }
1647 } catch (KeeperException.NoNodeException nne) {
1648
1649 }
1650 }
1651 }
1652 }
1653
1654
1655
1656
1657
1658
1659 private void addToServers(final HServerInfo hsi, final HRegionInfo hri) {
1660 Set<HRegionInfo> hris = servers.get(hsi);
1661 if (hris == null) {
1662 hris = new ConcurrentSkipListSet<HRegionInfo>();
1663 servers.put(hsi, hris);
1664 }
1665 if (!hris.contains(hri)) hris.add(hri);
1666 }
1667
1668
1669
1670
1671 public NavigableMap<String, RegionState> getRegionsInTransition() {
1672 synchronized (this.regionsInTransition) {
1673 return new TreeMap<String, RegionState>(this.regionsInTransition);
1674 }
1675 }
1676
1677
1678
1679
1680 public boolean isRegionsInTransition() {
1681 synchronized (this.regionsInTransition) {
1682 return !this.regionsInTransition.isEmpty();
1683 }
1684 }
1685
1686
1687
1688
1689
1690
1691 public RegionState isRegionInTransition(final HRegionInfo hri) {
1692 synchronized (this.regionsInTransition) {
1693 return this.regionsInTransition.get(hri.getEncodedName());
1694 }
1695 }
1696
1697
1698
1699
1700
1701
1702
1703 public void clearRegionFromTransition(HRegionInfo hri) {
1704 synchronized (this.regionsInTransition) {
1705 this.regionsInTransition.remove(hri.getEncodedName());
1706 }
1707 synchronized (this.regions) {
1708 this.regions.remove(hri);
1709 for (Set<HRegionInfo> regions : this.servers.values()) {
1710 regions.remove(hri);
1711 }
1712 }
1713 clearRegionPlan(hri);
1714 }
1715
1716
1717
1718
1719 void clearRegionPlan(final HRegionInfo region) {
1720 synchronized (this.regionPlans) {
1721 this.regionPlans.remove(region.getEncodedName());
1722 }
1723 }
1724
1725
1726
1727
1728
1729
1730 public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
1731 throws IOException {
1732 if (isRegionInTransition(hri) == null) return;
1733 RegionState rs = null;
1734
1735
1736 while(!this.master.isStopped() && (rs = isRegionInTransition(hri)) != null) {
1737 Threads.sleep(1000);
1738 LOG.info("Waiting on " + rs + " to clear regions-in-transition");
1739 }
1740 if (this.master.isStopped()) {
1741 LOG.info("Giving up wait on regions in " +
1742 "transition because stoppable.isStopped is set");
1743 }
1744 }
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757 public List<HRegionInfo> getRegionsOfTable(byte[] tableName) {
1758 List<HRegionInfo> tableRegions = new ArrayList<HRegionInfo>();
1759 HRegionInfo boundary =
1760 new HRegionInfo(new HTableDescriptor(tableName), null, null);
1761 synchronized (this.regions) {
1762 for (HRegionInfo regionInfo: this.regions.tailMap(boundary).keySet()) {
1763 if(Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
1764 tableRegions.add(regionInfo);
1765 } else {
1766 break;
1767 }
1768 }
1769 }
1770 return tableRegions;
1771 }
1772
1773
1774
1775
1776 public class TimeoutMonitor extends Chore {
1777 private final int timeout;
1778 private boolean bulkAssign = false;
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789 public TimeoutMonitor(final int period, final Stoppable stopper,
1790 final int timeout) {
1791 super("AssignmentTimeoutMonitor", period, stopper);
1792 this.timeout = timeout;
1793 }
1794
1795
1796
1797
1798
1799
1800 public boolean bulkAssign(final boolean bulkAssign) {
1801 boolean result = this.bulkAssign;
1802 this.bulkAssign = bulkAssign;
1803 return result;
1804 }
1805
1806 @Override
1807 protected void chore() {
1808
1809 if (this.bulkAssign) return;
1810 List<HRegionInfo> unassigns = new ArrayList<HRegionInfo>();
1811 Map<HRegionInfo, Boolean> assigns =
1812 new HashMap<HRegionInfo, Boolean>();
1813 synchronized (regionsInTransition) {
1814
1815 long now = System.currentTimeMillis();
1816 for (RegionState regionState : regionsInTransition.values()) {
1817 if (regionState.getStamp() + timeout <= now) {
1818 HRegionInfo regionInfo = regionState.getRegion();
1819 LOG.info("Regions in transition timed out: " + regionState);
1820
1821 switch (regionState.getState()) {
1822 case CLOSED:
1823 LOG.info("Region " + regionInfo.getEncodedName() +
1824 " has been CLOSED for too long, waiting on queued " +
1825 "ClosedRegionHandler to run or server shutdown");
1826
1827 synchronized(regionState) {
1828 regionState.update(regionState.getState());
1829 }
1830 break;
1831 case OFFLINE:
1832 LOG.info("Region has been OFFLINE for too long, " +
1833 "reassigning " + regionInfo.getRegionNameAsString() +
1834 " to a random server");
1835 assigns.put(regionState.getRegion(), Boolean.FALSE);
1836 break;
1837 case PENDING_OPEN:
1838 LOG.info("Region has been PENDING_OPEN for too " +
1839 "long, reassigning region=" +
1840 regionInfo.getRegionNameAsString());
1841 assigns.put(regionState.getRegion(), Boolean.TRUE);
1842 break;
1843 case OPENING:
1844 LOG.info("Region has been OPENING for too " +
1845 "long, reassigning region=" +
1846 regionInfo.getRegionNameAsString());
1847
1848 try {
1849 String node = ZKAssign.getNodeName(watcher,
1850 regionInfo.getEncodedName());
1851 Stat stat = new Stat();
1852 RegionTransitionData data = ZKAssign.getDataNoWatch(watcher,
1853 node, stat);
1854 if (data == null) {
1855 LOG.warn("Data is null, node " + node + " no longer exists");
1856 break;
1857 }
1858 if (data.getEventType() == EventType.RS_ZK_REGION_OPENED) {
1859 LOG.debug("Region has transitioned to OPENED, allowing " +
1860 "watched event handlers to process");
1861 break;
1862 } else if (data.getEventType() !=
1863 EventType.RS_ZK_REGION_OPENING) {
1864 LOG.warn("While timing out a region in state OPENING, " +
1865 "found ZK node in unexpected state: " +
1866 data.getEventType());
1867 break;
1868 }
1869
1870 try {
1871 data = new RegionTransitionData(
1872 EventType.M_ZK_REGION_OFFLINE, regionInfo.getRegionName(),
1873 master.getServerName());
1874 if (ZKUtil.setData(watcher, node, data.getBytes(),
1875 stat.getVersion())) {
1876
1877 ZKUtil.getDataAndWatch(watcher, node);
1878 LOG.info("Successfully transitioned region=" +
1879 regionInfo.getRegionNameAsString() + " into OFFLINE" +
1880 " and forcing a new assignment");
1881 assigns.put(regionState.getRegion(), Boolean.TRUE);
1882 }
1883 } catch (KeeperException.NoNodeException nne) {
1884
1885 }
1886 } catch (KeeperException ke) {
1887 LOG.error("Unexpected ZK exception timing out CLOSING region",
1888 ke);
1889 break;
1890 }
1891 break;
1892 case OPEN:
1893 LOG.error("Region has been OPEN for too long, " +
1894 "we don't know where region was opened so can't do anything");
1895 synchronized(regionState) {
1896 regionState.update(regionState.getState());
1897 }
1898 break;
1899
1900 case PENDING_CLOSE:
1901 LOG.info("Region has been PENDING_CLOSE for too " +
1902 "long, running forced unassign again on region=" +
1903 regionInfo.getRegionNameAsString());
1904 try {
1905
1906
1907 if (!ZKUtil.watchAndCheckExists(watcher,
1908 ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) {
1909
1910
1911 unassigns.add(regionInfo);
1912 }
1913 } catch (NoNodeException e) {
1914 LOG.debug("Node no longer existed so not forcing another " +
1915 "unassignment");
1916 } catch (KeeperException e) {
1917 LOG.warn("Unexpected ZK exception timing out a region " +
1918 "close", e);
1919 }
1920 break;
1921 case CLOSING:
1922 LOG.info("Region has been CLOSING for too " +
1923 "long, this should eventually complete or the server will " +
1924 "expire, doing nothing");
1925 break;
1926 }
1927 }
1928 }
1929 }
1930
1931 for (HRegionInfo hri: unassigns) {
1932 unassign(hri, true);
1933 }
1934 for (Map.Entry<HRegionInfo, Boolean> e: assigns.entrySet()){
1935 assign(e.getKey(), false, e.getValue());
1936 }
1937 }
1938 }
1939
1940
1941
1942
1943
1944
1945 public List<RegionState> processServerShutdown(final HServerInfo hsi) {
1946
1947 synchronized (this.regionPlans) {
1948 for (Iterator <Map.Entry<String, RegionPlan>> i =
1949 this.regionPlans.entrySet().iterator(); i.hasNext();) {
1950 Map.Entry<String, RegionPlan> e = i.next();
1951 HServerInfo otherHsi = e.getValue().getDestination();
1952
1953 if (otherHsi != null && otherHsi.equals(hsi)) {
1954
1955 i.remove();
1956 }
1957 }
1958 }
1959
1960
1961
1962 Set<HRegionInfo> deadRegions = null;
1963 List<RegionState> rits = new ArrayList<RegionState>();
1964 synchronized (this.regions) {
1965 Set<HRegionInfo> assignedRegions = this.servers.remove(hsi);
1966 if (assignedRegions == null || assignedRegions.isEmpty()) {
1967
1968 return rits;
1969 }
1970 deadRegions = new TreeSet<HRegionInfo>(assignedRegions);
1971 for (HRegionInfo region : deadRegions) {
1972 this.regions.remove(region);
1973 }
1974 }
1975
1976
1977
1978 synchronized (regionsInTransition) {
1979 for (RegionState region : this.regionsInTransition.values()) {
1980 if (deadRegions.remove(region.getRegion())) {
1981 rits.add(region);
1982 }
1983 }
1984 }
1985 return rits;
1986 }
1987
1988
1989
1990
1991
1992
1993
1994
1995 public void handleSplitReport(final HServerInfo hsi, final HRegionInfo parent,
1996 final HRegionInfo a, final HRegionInfo b) {
1997 regionOffline(parent);
1998
1999
2000
2001 try {
2002 RegionTransitionData node = ZKAssign.getDataNoWatch(this.watcher,
2003 parent.getEncodedName(), null);
2004 if (node != null) {
2005 if (node.getEventType().equals(EventType.RS_ZK_REGION_CLOSING)) {
2006 ZKAssign.deleteClosingNode(this.watcher, parent);
2007 } else {
2008 LOG.warn("Split report has RIT node (shouldnt have one): " +
2009 parent + " node: " + node);
2010 }
2011 }
2012 } catch (KeeperException e) {
2013 LOG.warn("Exception while validating RIT during split report", e);
2014 }
2015 synchronized (this.regions) {
2016
2017 HServerInfo hsia = this.regions.get(a);
2018 if (hsia != null){
2019 LOG.warn("Trying to process the split of " +a.getEncodedName()+ ", " +
2020 "but it was already done and one daughter is on region server " + hsia);
2021 return;
2022 }
2023 }
2024
2025 regionOnline(a, hsi);
2026 regionOnline(b, hsi);
2027
2028
2029
2030
2031
2032 if (this.zkTable.isDisablingOrDisabledTable(
2033 parent.getTableDesc().getNameAsString())) {
2034 unassign(a);
2035 unassign(b);
2036 }
2037 }
2038
2039
2040
2041
2042
2043
2044 Map<HServerInfo, List<HRegionInfo>> getAssignments() {
2045
2046
2047
2048
2049 Map<HServerInfo, List<HRegionInfo>> result = null;
2050 synchronized (this.regions) {
2051 result = new HashMap<HServerInfo, List<HRegionInfo>>(this.servers.size());
2052 for (Map.Entry<HServerInfo, Set<HRegionInfo>> e: this.servers.entrySet()) {
2053 List<HRegionInfo> shallowCopy = new ArrayList<HRegionInfo>(e.getValue());
2054 HServerInfo clone = new HServerInfo(e.getKey());
2055
2056
2057 clone.getLoad().setNumberOfRegions(e.getValue().size());
2058 result.put(clone, shallowCopy);
2059 }
2060 }
2061 return result;
2062 }
2063
2064
2065
2066
2067
2068
2069 Pair<HRegionInfo, HServerInfo> getAssignment(final byte [] encodedRegionName) {
2070 String name = Bytes.toString(encodedRegionName);
2071 synchronized(this.regions) {
2072 for (Map.Entry<HRegionInfo, HServerInfo> e: this.regions.entrySet()) {
2073 if (e.getKey().getEncodedName().equals(name)) {
2074 return new Pair<HRegionInfo, HServerInfo>(e.getKey(), e.getValue());
2075 }
2076 }
2077 }
2078 return null;
2079 }
2080
2081
2082
2083
2084 void balance(final RegionPlan plan) {
2085 synchronized (this.regionPlans) {
2086 this.regionPlans.put(plan.getRegionName(), plan);
2087 }
2088 unassign(plan.getRegionInfo());
2089 }
2090
2091
2092
2093
2094
2095
2096
2097
2098 void bulkAssignUserRegions(final HRegionInfo [] regions,
2099 final List<HServerInfo> servers, final boolean sync)
2100 throws IOException {
2101 Map<HServerInfo, List<HRegionInfo>> bulkPlan =
2102 LoadBalancer.roundRobinAssignment(java.util.Arrays.asList(regions), servers);
2103 LOG.info("Bulk assigning " + regions.length + " region(s) " +
2104 "round-robin across " + servers.size() + " server(s)");
2105
2106 BulkAssigner ba = new GeneralBulkAssigner(this.master, bulkPlan, this);
2107 try {
2108 ba.bulkAssign(sync);
2109 } catch (InterruptedException e) {
2110 throw new IOException("InterruptedException bulk assigning", e);
2111 }
2112 LOG.info("Bulk assigning done");
2113 }
2114
2115
2116
2117
2118 public static class RegionState implements Writable {
2119 private HRegionInfo region;
2120
2121 public enum State {
2122 OFFLINE,
2123 PENDING_OPEN,
2124 OPENING,
2125 OPEN,
2126 PENDING_CLOSE,
2127 CLOSING,
2128 CLOSED
2129 }
2130
2131 private State state;
2132 private long stamp;
2133
2134 public RegionState() {}
2135
2136 RegionState(HRegionInfo region, State state) {
2137 this(region, state, System.currentTimeMillis());
2138 }
2139
2140 RegionState(HRegionInfo region, State state, long stamp) {
2141 this.region = region;
2142 this.state = state;
2143 this.stamp = stamp;
2144 }
2145
2146 public void update(State state, long stamp) {
2147 this.state = state;
2148 this.stamp = stamp;
2149 }
2150
2151 public void update(State state) {
2152 this.state = state;
2153 this.stamp = System.currentTimeMillis();
2154 }
2155
2156 public State getState() {
2157 return state;
2158 }
2159
2160 public long getStamp() {
2161 return stamp;
2162 }
2163
2164 public HRegionInfo getRegion() {
2165 return region;
2166 }
2167
2168 public boolean isClosing() {
2169 return state == State.CLOSING;
2170 }
2171
2172 public boolean isClosed() {
2173 return state == State.CLOSED;
2174 }
2175
2176 public boolean isPendingClose() {
2177 return state == State.PENDING_CLOSE;
2178 }
2179
2180 public boolean isOpening() {
2181 return state == State.OPENING;
2182 }
2183
2184 public boolean isOpened() {
2185 return state == State.OPEN;
2186 }
2187
2188 public boolean isPendingOpen() {
2189 return state == State.PENDING_OPEN;
2190 }
2191
2192 public boolean isOffline() {
2193 return state == State.OFFLINE;
2194 }
2195
2196 @Override
2197 public String toString() {
2198 return region.getRegionNameAsString() + " state=" + state +
2199 ", ts=" + stamp;
2200 }
2201
2202 @Override
2203 public void readFields(DataInput in) throws IOException {
2204 region = new HRegionInfo();
2205 region.readFields(in);
2206 state = State.valueOf(in.readUTF());
2207 stamp = in.readLong();
2208 }
2209
2210 @Override
2211 public void write(DataOutput out) throws IOException {
2212 region.write(out);
2213 out.writeUTF(state.name());
2214 out.writeLong(stamp);
2215 }
2216 }
2217
2218 public void stop() {
2219 this.timeoutMonitor.interrupt();
2220 }
2221
2222
2223
2224
2225 public boolean isServerOnline(String serverName) {
2226 return this.serverManager.isServerOnline(serverName);
2227 }
2228 }