View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.master;
21  
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.io.IOException;
25  import java.lang.Thread.UncaughtExceptionHandler;
26  import java.util.ArrayList;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.NavigableMap;
33  import java.util.Set;
34  import java.util.SortedMap;
35  import java.util.TreeMap;
36  import java.util.TreeSet;
37  import java.util.concurrent.ConcurrentSkipListMap;
38  import java.util.concurrent.ConcurrentSkipListSet;
39  import java.util.concurrent.atomic.AtomicInteger;
40  
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.hbase.Chore;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.HRegionInfo;
47  import org.apache.hadoop.hbase.HServerAddress;
48  import org.apache.hadoop.hbase.HServerInfo;
49  import org.apache.hadoop.hbase.HTableDescriptor;
50  import org.apache.hadoop.hbase.NotServingRegionException;
51  import org.apache.hadoop.hbase.Server;
52  import org.apache.hadoop.hbase.Stoppable;
53  import org.apache.hadoop.hbase.catalog.CatalogTracker;
54  import org.apache.hadoop.hbase.catalog.MetaReader;
55  import org.apache.hadoop.hbase.catalog.RootLocationEditor;
56  import org.apache.hadoop.hbase.client.Result;
57  import org.apache.hadoop.hbase.executor.EventHandler.EventType;
58  import org.apache.hadoop.hbase.executor.ExecutorService;
59  import org.apache.hadoop.hbase.executor.RegionTransitionData;
60  import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan;
61  import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
62  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
63  import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
64  import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.apache.hadoop.hbase.util.Pair;
67  import org.apache.hadoop.hbase.util.Threads;
68  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
69  import org.apache.hadoop.hbase.zookeeper.ZKTable;
70  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
71  import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData;
72  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
73  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
74  import org.apache.hadoop.io.Writable;
75  import org.apache.hadoop.ipc.RemoteException;
76  import org.apache.zookeeper.AsyncCallback;
77  import org.apache.zookeeper.KeeperException;
78  import org.apache.zookeeper.KeeperException.NoNodeException;
79  import org.apache.zookeeper.data.Stat;
80  
81  /**
82   * Manages and performs region assignment.
83   * <p>
84   * Monitors ZooKeeper for events related to regions in transition.
85   * <p>
86   * Handles existing regions in transition during master failover.
87   */
88  public class AssignmentManager extends ZooKeeperListener {
89    private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
90  
91    protected Server master;
92  
93    private ServerManager serverManager;
94  
95    private CatalogTracker catalogTracker;
96  
97    private TimeoutMonitor timeoutMonitor;
98  
99    /*
100    * Maximum times we recurse an assignment.  See below in {@link #assign()}.
101    */
102   private final int maximumAssignmentAttempts;
103 
104   /**
105    * Regions currently in transition.  Map of encoded region names to the master
106    * in-memory state for that region.
107    */
108   final ConcurrentSkipListMap<String, RegionState> regionsInTransition =
109     new ConcurrentSkipListMap<String, RegionState>();
110 
111   /** Plans for region movement. Key is the encoded version of a region name*/
112   // TODO: When do plans get cleaned out?  Ever? In server open and in server
113   // shutdown processing -- St.Ack
114   // All access to this Map must be synchronized.
115   final NavigableMap<String, RegionPlan> regionPlans =
116     new TreeMap<String, RegionPlan>();
117 
118   private final ZKTable zkTable;
119 
120   /**
121    * Server to regions assignment map.
122    * Contains the set of regions currently assigned to a given server.
123    * This Map and {@link #regions} are tied.  Always update this in tandem
124    * with the other under a lock on {@link #regions}
125    * @see #regions
126    */
127   private final NavigableMap<HServerInfo, Set<HRegionInfo>> servers =
128     new TreeMap<HServerInfo, Set<HRegionInfo>>();
129 
130   /**
131    * Region to server assignment map.
132    * Contains the server a given region is currently assigned to.
133    * This Map and {@link #servers} are tied.  Always update this in tandem
134    * with the other under a lock on {@link #regions}
135    * @see #servers
136    */
137   private final SortedMap<HRegionInfo,HServerInfo> regions =
138     new TreeMap<HRegionInfo,HServerInfo>();
139 
140   private final ExecutorService executorService;
141 
142   /**
143    * Constructs a new assignment manager.
144    *
145    * @param master
146    * @param serverManager
147    * @param catalogTracker
148    * @param service
149    * @throws KeeperException
150    */
151   public AssignmentManager(Server master, ServerManager serverManager,
152       CatalogTracker catalogTracker, final ExecutorService service)
153   throws KeeperException {
154     super(master.getZooKeeper());
155     this.master = master;
156     this.serverManager = serverManager;
157     this.catalogTracker = catalogTracker;
158     this.executorService = service;
159     Configuration conf = master.getConfiguration();
160     this.timeoutMonitor = new TimeoutMonitor(
161       conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000),
162       master,
163       conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000));
164     Threads.setDaemonThreadRunning(timeoutMonitor,
165       master.getServerName() + ".timeoutMonitor");
166     this.zkTable = new ZKTable(this.master.getZooKeeper());
167     this.maximumAssignmentAttempts =
168       this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
169   }
170 
171   /**
172    * @return Instance of ZKTable.
173    */
174   public ZKTable getZKTable() {
175     // These are 'expensive' to make involving trip to zk ensemble so allow
176     // sharing.
177     return this.zkTable;
178   }
179 
180   /**
181    * Reset all unassigned znodes.  Called on startup of master.
182    * Call {@link #assignAllUserRegions()} after root and meta have been assigned.
183    * @throws IOException
184    * @throws KeeperException
185    */
186   void cleanoutUnassigned() throws IOException, KeeperException {
187     // Cleanup any existing ZK nodes and start watching
188     ZKAssign.deleteAllNodes(watcher);
189     ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
190       this.watcher.assignmentZNode);
191   }
192 
193   /**
194    * Handle failover.  Restore state from META and ZK.  Handle any regions in
195    * transition.  Presumes <code>.META.</code> and <code>-ROOT-</code> deployed.
196    * @throws KeeperException
197    * @throws IOException
198    * @throws InterruptedException 
199    */
200   void processFailover() throws KeeperException, IOException, InterruptedException {
201     // Concurrency note: In the below the accesses on regionsInTransition are
202     // outside of a synchronization block where usually all accesses to RIT are
203     // synchronized.  The presumption is that in this case it is safe since this
204     // method is being played by a single thread on startup.
205 
206     // TODO: Check list of user regions and their assignments against regionservers.
207     // TODO: Regions that have a null location and are not in regionsInTransitions
208     // need to be handled.
209 
210     // Add -ROOT- and .META. on regions map.  They must be deployed if we got
211     // this far.  Caller takes care of it.
212     HServerInfo hsi =
213       this.serverManager.getHServerInfo(this.catalogTracker.getMetaLocation());
214     regionOnline(HRegionInfo.FIRST_META_REGIONINFO, hsi);
215     hsi = this.serverManager.getHServerInfo(this.catalogTracker.getRootLocation());
216     regionOnline(HRegionInfo.ROOT_REGIONINFO, hsi);
217 
218     // Scan META to build list of existing regions, servers, and assignment
219     // Returns servers who have not checked in (assumed dead) and their regions
220     Map<String, List<Pair<HRegionInfo, Result>>> deadServers =
221       rebuildUserRegions();
222     // Process list of dead servers; note this will add regions to the RIT.
223     // processRegionsInTransition will read them and assign them out.
224     processDeadServers(deadServers);
225     // Check existing regions in transition
226     processRegionsInTransition(deadServers);
227   }
228 
229   /**
230    * Process all regions that are in transition up in zookeeper.  Used by
231    * master joining an already running cluster.
232    * @throws KeeperException
233    * @throws IOException
234    * @throws InterruptedException
235    */
236   void processRegionsInTransition()
237   throws KeeperException, IOException, InterruptedException {
238     // Pass null to signify no dead servers in this context.
239     processRegionsInTransition(null);
240   }
241 
242   /**
243    * Process all regions that are in transition up in zookeeper.  Used by
244    * master joining an already running cluster.
245    * @param deadServers Map of dead servers and their regions.  Can be null.
246    * @throws KeeperException
247    * @throws IOException
248    * @throws InterruptedException
249    */
250   void processRegionsInTransition(
251       final Map<String, List<Pair<HRegionInfo, Result>>> deadServers)
252   throws KeeperException, IOException, InterruptedException {
253     List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
254         watcher.assignmentZNode);
255     if (nodes.isEmpty()) {
256       LOG.info("No regions in transition in ZK to process on failover");
257       return;
258     }
259     LOG.info("Failed-over master needs to process " + nodes.size() +
260         " regions in transition");
261     for (String encodedRegionName: nodes) {
262       processRegionInTransition(encodedRegionName, null, deadServers);
263     }
264   }
265 
266   /**
267    * If region is up in zk in transition, then do fixup and block and wait until
268    * the region is assigned and out of transition.  Used on startup for
269    * catalog regions.
270    * @param hri Region to look for.
271    * @return True if we processed a region in transition else false if region
272    * was not up in zk in transition.
273    * @throws InterruptedException
274    * @throws KeeperException
275    * @throws IOException
276    */
277   boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
278   throws InterruptedException, KeeperException, IOException {
279     boolean intransistion =
280       processRegionInTransition(hri.getEncodedName(), hri, null);
281     if (!intransistion) return intransistion;
282     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(hri.getEncodedName()));
283     synchronized(this.regionsInTransition) {
284       while (!this.master.isStopped() &&
285           this.regionsInTransition.containsKey(hri.getEncodedName())) {
286         this.regionsInTransition.wait();
287       }
288     }
289     return intransistion;
290   }
291 
292   /**
293    * Process failover of new master for region <code>encodedRegionName</code>
294    * up in zookeeper.
295    * @param encodedRegionName Region to process failover for.
296    * @param encodedRegionName RegionInfo.  If null we'll go get it from meta table.
297    * @param deadServers Can be null 
298    * @return
299    * @throws KeeperException
300    * @throws IOException
301    */
302   boolean processRegionInTransition(final String encodedRegionName,
303       final HRegionInfo regionInfo,
304       final Map<String, List<Pair<HRegionInfo,Result>>> deadServers)
305   throws KeeperException, IOException {
306     RegionTransitionData data = ZKAssign.getData(watcher, encodedRegionName);
307     if (data == null) return false;
308     HRegionInfo hri = regionInfo;
309     if (hri == null) {
310       Pair<HRegionInfo, HServerAddress> p =
311         MetaReader.getRegion(catalogTracker, data.getRegionName());
312       if (p == null) return false;
313       hri = p.getFirst();
314     }
315     processRegionsInTransition(data, hri, deadServers);
316     return true;
317   }
318 
319   void processRegionsInTransition(final RegionTransitionData data,
320       final HRegionInfo regionInfo,
321       final Map<String, List<Pair<HRegionInfo,Result>>> deadServers)
322   throws KeeperException {
323     String encodedRegionName = regionInfo.getEncodedName();
324     LOG.info("Processing region " + regionInfo.getRegionNameAsString() +
325       " in state " + data.getEventType());
326     synchronized (regionsInTransition) {
327       switch (data.getEventType()) {
328       case RS_ZK_REGION_CLOSING:
329         if (isOnDeadServer(regionInfo, deadServers)) {
330           // If was on dead server, its closed now.  Force to OFFLINE and this
331           // will get it reassigned if appropriate
332           forceOffline(regionInfo, data);
333         } else {
334           // Just insert region into RIT.
335           // If this never updates the timeout will trigger new assignment
336           regionsInTransition.put(encodedRegionName, new RegionState(
337             regionInfo, RegionState.State.CLOSING, data.getStamp()));
338         }
339         break;
340 
341       case RS_ZK_REGION_CLOSED:
342         // Region is closed, insert into RIT and handle it
343         addToRITandCallClose(regionInfo, RegionState.State.CLOSED, data);
344         break;
345 
346       case M_ZK_REGION_OFFLINE:
347         // Region is offline, insert into RIT and handle it like a closed
348         addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data);
349         break;
350 
351       case RS_ZK_REGION_OPENING:
352         // TODO: Could check if it was on deadServers.  If it was, then we could
353         // do what happens in TimeoutMonitor when it sees this condition.
354 
355         // Just insert region into RIT
356         // If this never updates the timeout will trigger new assignment
357         regionsInTransition.put(encodedRegionName, new RegionState(
358             regionInfo, RegionState.State.OPENING, data.getStamp()));
359         break;
360 
361       case RS_ZK_REGION_OPENED:
362         // Region is opened, insert into RIT and handle it
363         regionsInTransition.put(encodedRegionName, new RegionState(
364             regionInfo, RegionState.State.OPEN, data.getStamp()));
365         String sn = data.getServerName();
366         // hsi could be null if this server is no longer online.  If
367         // that the case, just let this RIT timeout; it'll be assigned
368         // to new server then.
369         if (sn == null) {
370           LOG.warn("Region in transition " + regionInfo.getEncodedName() +
371             " references a server no longer up " + data.getServerName() +
372             "; letting RIT timeout so will be assigned elsewhere");
373           break;
374         }
375         if (isOnDeadServer(regionInfo, deadServers)) {
376           // If was on a dead server, then its not open any more; needs handling.
377           forceOffline(regionInfo, data);
378         } else {
379           HServerInfo hsi = this.serverManager.getServerInfo(sn);
380           if (hsi == null) {
381             LOG.info("Failed to find " + sn +
382               " in list of online servers; skipping registration of open of " +
383               regionInfo.getRegionNameAsString());
384           } else {
385             new OpenedRegionHandler(master, this, regionInfo, hsi).process();
386           }
387         }
388         break;
389       }
390     }
391   }
392 
393   /**
394    * Put the region <code>hri</code> into an offline state up in zk.
395    * @param hri
396    * @param oldData
397    * @throws KeeperException
398    */
399   private void forceOffline(final HRegionInfo hri,
400       final RegionTransitionData oldData)
401   throws KeeperException {
402     // If was on dead server, its closed now.  Force to OFFLINE and then
403     // handle it like a close; this will get it reassigned if appropriate
404     LOG.debug("RIT " + hri.getEncodedName() + " in state=" +
405       oldData.getEventType() + " was on deadserver; forcing offline");
406     ZKAssign.createOrForceNodeOffline(this.watcher, hri,
407       this.master.getServerName());
408     addToRITandCallClose(hri, RegionState.State.OFFLINE, oldData);
409   }
410 
411   /**
412    * Add to the in-memory copy of regions in transition and then call close
413    * handler on passed region <code>hri</code>
414    * @param hri
415    * @param state
416    * @param oldData
417    */
418   private void addToRITandCallClose(final HRegionInfo hri,
419       final RegionState.State state, final RegionTransitionData oldData) {
420     this.regionsInTransition.put(hri.getEncodedName(),
421       new RegionState(hri, state, oldData.getStamp()));
422     new ClosedRegionHandler(this.master, this, hri).process();
423   }
424 
425   /**
426    * @param regionInfo
427    * @param deadServers Map of deadServers and the regions they were carrying;
428    * can be null.
429    * @return True if the passed regionInfo in the passed map of deadServers?
430    */
431   private boolean isOnDeadServer(final HRegionInfo regionInfo,
432       final Map<String, List<Pair<HRegionInfo, Result>>> deadServers) {
433     if (deadServers == null) return false;
434     for (Map.Entry<String, List<Pair<HRegionInfo, Result>>> deadServer:
435         deadServers.entrySet()) {
436       for (Pair<HRegionInfo, Result> e: deadServer.getValue()) {
437         if (e.getFirst().equals(regionInfo)) return true;
438       }
439     }
440     return false;
441   }
442 
443   /**
444    * Handles various states an unassigned node can be in.
445    * <p>
446    * Method is called when a state change is suspected for an unassigned node.
447    * <p>
448    * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
449    * yet).
450    * @param data
451    */
452   private void handleRegion(final RegionTransitionData data) {
453     synchronized(regionsInTransition) {
454       if (data == null || data.getServerName() == null) {
455         LOG.warn("Unexpected NULL input " + data);
456         return;
457       }
458       // Check if this is a special HBCK transition
459       if (data.getServerName().equals(HConstants.HBCK_CODE_NAME)) {
460         handleHBCK(data);
461         return;
462       }
463       // Verify this is a known server
464       if (!serverManager.isServerOnline(data.getServerName()) &&
465           !this.master.getServerName().equals(data.getServerName())) {
466         LOG.warn("Attempted to handle region transition for server but " +
467           "server is not online: " + Bytes.toString(data.getRegionName()));
468         return;
469       }
470       String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
471       String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
472       // Printing if the event was created a long time ago helps debugging
473       boolean lateEvent = data.getStamp() <
474           (System.currentTimeMillis() - 15000);
475       LOG.debug("Handling transition=" + data.getEventType() +
476         ", server=" + data.getServerName() + ", region=" +
477           prettyPrintedRegionName +
478           (lateEvent? ", which is more than 15 seconds late" : ""));
479       RegionState regionState = regionsInTransition.get(encodedName);
480       switch (data.getEventType()) {
481         case M_ZK_REGION_OFFLINE:
482           // Nothing to do.
483           break;
484 
485         case RS_ZK_REGION_CLOSING:
486           // Should see CLOSING after we have asked it to CLOSE or additional
487           // times after already being in state of CLOSING
488           if (regionState == null ||
489               (!regionState.isPendingClose() && !regionState.isClosing())) {
490             LOG.warn("Received CLOSING for region " + prettyPrintedRegionName +
491               " from server " + data.getServerName() + " but region was in " +
492               " the state " + regionState + " and not " +
493               "in expected PENDING_CLOSE or CLOSING states");
494             return;
495           }
496           // Transition to CLOSING (or update stamp if already CLOSING)
497           regionState.update(RegionState.State.CLOSING, data.getStamp());
498           break;
499 
500         case RS_ZK_REGION_CLOSED:
501           // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
502           if (regionState == null ||
503               (!regionState.isPendingClose() && !regionState.isClosing())) {
504             LOG.warn("Received CLOSED for region " + prettyPrintedRegionName +
505                 " from server " + data.getServerName() + " but region was in " +
506                 " the state " + regionState + " and not " +
507                 "in expected PENDING_CLOSE or CLOSING states");
508             return;
509           }
510           // Handle CLOSED by assigning elsewhere or stopping if a disable
511           // If we got here all is good.  Need to update RegionState -- else
512           // what follows will fail because not in expected state.
513           regionState.update(RegionState.State.CLOSED, data.getStamp());
514           this.executorService.submit(new ClosedRegionHandler(master,
515             this, regionState.getRegion()));
516           break;
517 
518         case RS_ZK_REGION_OPENING:
519           // Should see OPENING after we have asked it to OPEN or additional
520           // times after already being in state of OPENING
521           if (regionState == null ||
522               (!regionState.isPendingOpen() && !regionState.isOpening())) {
523             LOG.warn("Received OPENING for region " +
524                 prettyPrintedRegionName +
525                 " from server " + data.getServerName() + " but region was in " +
526                 " the state " + regionState + " and not " +
527                 "in expected PENDING_OPEN or OPENING states");
528             return;
529           }
530           // Transition to OPENING (or update stamp if already OPENING)
531           regionState.update(RegionState.State.OPENING, data.getStamp());
532           break;
533 
534         case RS_ZK_REGION_OPENED:
535           // Should see OPENED after OPENING but possible after PENDING_OPEN
536           if (regionState == null ||
537               (!regionState.isPendingOpen() && !regionState.isOpening())) {
538             LOG.warn("Received OPENED for region " +
539                 prettyPrintedRegionName +
540                 " from server " + data.getServerName() + " but region was in " +
541                 " the state " + regionState + " and not " +
542                 "in expected PENDING_OPEN or OPENING states");
543             return;
544           }
545           // Handle OPENED by removing from transition and deleted zk node
546           regionState.update(RegionState.State.OPEN, data.getStamp());
547           this.executorService.submit(
548             new OpenedRegionHandler(master, this, regionState.getRegion(),
549               this.serverManager.getServerInfo(data.getServerName())));
550           break;
551       }
552     }
553   }
554 
555   /**
556    * Handle a ZK unassigned node transition triggered by HBCK repair tool.
557    * <p>
558    * This is handled in a separate code path because it breaks the normal rules.
559    * @param data
560    */
561   private void handleHBCK(RegionTransitionData data) {
562     String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
563     LOG.info("Handling HBCK triggered transition=" + data.getEventType() +
564       ", server=" + data.getServerName() + ", region=" +
565       HRegionInfo.prettyPrint(encodedName));
566     RegionState regionState = regionsInTransition.get(encodedName);
567     switch (data.getEventType()) {
568       case M_ZK_REGION_OFFLINE:
569         HRegionInfo regionInfo = null;
570         if (regionState != null) {
571           regionInfo = regionState.getRegion();
572         } else {
573           try {
574             regionInfo = MetaReader.getRegion(catalogTracker,
575                 data.getRegionName()).getFirst();
576           } catch (IOException e) {
577             LOG.info("Exception reading META doing HBCK repair operation", e);
578             return;
579           }
580         }
581         LOG.info("HBCK repair is triggering assignment of region=" +
582             regionInfo.getRegionNameAsString());
583         // trigger assign, node is already in OFFLINE so don't need to update ZK
584         assign(regionInfo, false);
585         break;
586 
587       default:
588         LOG.warn("Received unexpected region state from HBCK (" +
589             data.getEventType() + ")");
590         break;
591     }
592   }
593 
594   // ZooKeeper events
595 
596   /**
597    * New unassigned node has been created.
598    *
599    * <p>This happens when an RS begins the OPENING or CLOSING of a region by
600    * creating an unassigned node.
601    *
602    * <p>When this happens we must:
603    * <ol>
604    *   <li>Watch the node for further events</li>
605    *   <li>Read and handle the state in the node</li>
606    * </ol>
607    */
608   @Override
609   public void nodeCreated(String path) {
610     if(path.startsWith(watcher.assignmentZNode)) {
611       synchronized(regionsInTransition) {
612         try {
613           RegionTransitionData data = ZKAssign.getData(watcher, path);
614           if(data == null) {
615             return;
616           }
617           handleRegion(data);
618         } catch (KeeperException e) {
619           master.abort("Unexpected ZK exception reading unassigned node data", e);
620         }
621       }
622     }
623   }
624 
625   /**
626    * Existing unassigned node has had data changed.
627    *
628    * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
629    * OPENING/OPENED and CLOSING/CLOSED.
630    *
631    * <p>When this happens we must:
632    * <ol>
633    *   <li>Watch the node for further events</li>
634    *   <li>Read and handle the state in the node</li>
635    * </ol>
636    */
637   @Override
638   public void nodeDataChanged(String path) {
639     if(path.startsWith(watcher.assignmentZNode)) {
640       synchronized(regionsInTransition) {
641         try {
642           RegionTransitionData data = ZKAssign.getData(watcher, path);
643           if(data == null) {
644             return;
645           }
646           handleRegion(data);
647         } catch (KeeperException e) {
648           master.abort("Unexpected ZK exception reading unassigned node data", e);
649         }
650       }
651     }
652   }
653 
654   /**
655    * New unassigned node has been created.
656    *
657    * <p>This happens when an RS begins the OPENING or CLOSING of a region by
658    * creating an unassigned node.
659    *
660    * <p>When this happens we must:
661    * <ol>
662    *   <li>Watch the node for further children changed events</li>
663    *   <li>Watch all new children for changed events</li>
664    *   <li>Read all children and handle them</li>
665    * </ol>
666    */
667   @Override
668   public void nodeChildrenChanged(String path) {
669     if(path.equals(watcher.assignmentZNode)) {
670       synchronized(regionsInTransition) {
671         try {
672           List<NodeAndData> newNodes = ZKUtil.watchAndGetNewChildren(watcher,
673               watcher.assignmentZNode);
674           for(NodeAndData newNode : newNodes) {
675             LOG.debug("Handling new unassigned node: " + newNode);
676             handleRegion(RegionTransitionData.fromBytes(newNode.getData()));
677           }
678         } catch(KeeperException e) {
679           master.abort("Unexpected ZK exception reading unassigned children", e);
680         }
681       }
682     }
683   }
684 
685   /**
686    * Marks the region as online.  Removes it from regions in transition and
687    * updates the in-memory assignment information.
688    * <p>
689    * Used when a region has been successfully opened on a region server.
690    * @param regionInfo
691    * @param serverInfo
692    */
693   public void regionOnline(HRegionInfo regionInfo, HServerInfo serverInfo) {
694     synchronized (this.regionsInTransition) {
695       RegionState rs =
696         this.regionsInTransition.remove(regionInfo.getEncodedName());
697       if (rs != null) {
698         this.regionsInTransition.notifyAll();
699       }
700     }
701     synchronized (this.regions) {
702       // Add check
703       HServerInfo hsi = this.regions.get(regionInfo);
704       if (hsi != null) LOG.warn("Overwriting " + regionInfo.getEncodedName() +
705         " on " + hsi);
706       
707       HServerInfo hsiWithoutLoad = new HServerInfo(
708         serverInfo.getServerAddress(), serverInfo.getStartCode(),
709         serverInfo.getInfoPort(), serverInfo.getHostname());
710       
711       if (isServerOnline(hsiWithoutLoad.getServerName())) {
712         this.regions.put(regionInfo, hsiWithoutLoad);
713         addToServers(hsiWithoutLoad, regionInfo);
714         this.regions.notifyAll();
715       } else {
716         LOG.info("The server is not in online servers, ServerName=" + 
717           hsiWithoutLoad.getServerName() + ", region=" + 
718           regionInfo.getEncodedName());
719       }
720     }
721     // Remove plan if one.
722     clearRegionPlan(regionInfo);
723     // Update timers for all regions in transition going against this server.
724     updateTimers(serverInfo);
725   }
726 
727   /**
728    * Touch timers for all regions in transition that have the passed
729    * <code>hsi</code> in common.
730    * Call this method whenever a server checks in.  Doing so helps the case where
731    * a new regionserver has joined the cluster and its been given 1k regions to
732    * open.  If this method is tickled every time the region reports in a
733    * successful open then the 1k-th region won't be timed out just because its
734    * sitting behind the open of 999 other regions.  This method is NOT used
735    * as part of bulk assign -- there we have a different mechanism for extending
736    * the regions in transition timer (we turn it off temporarily -- because
737    * there is no regionplan involved when bulk assigning.
738    * @param hsi
739    */
740   private void updateTimers(final HServerInfo hsi) {
741     // This loop could be expensive.
742     // First make a copy of current regionPlan rather than hold sync while
743     // looping because holding sync can cause deadlock.  Its ok in this loop
744     // if the Map we're going against is a little stale
745     Map<String, RegionPlan> copy = new HashMap<String, RegionPlan>();
746     synchronized(this.regionPlans) {
747       copy.putAll(this.regionPlans);
748     }
749     for (Map.Entry<String, RegionPlan> e: copy.entrySet()) {
750       if (!e.getValue().getDestination().equals(hsi)) continue;
751       RegionState rs = null;
752       synchronized (this.regionsInTransition) {
753         rs = this.regionsInTransition.get(e.getKey());
754       }
755       if (rs == null) continue;
756       synchronized (rs) {
757         rs.update(rs.getState());
758       }
759     }
760   }
761 
762   /**
763    * Marks the region as offline.  Removes it from regions in transition and
764    * removes in-memory assignment information.
765    * <p>
766    * Used when a region has been closed and should remain closed.
767    * @param regionInfo
768    */
769   public void regionOffline(final HRegionInfo regionInfo) {
770     synchronized(this.regionsInTransition) {
771       if (this.regionsInTransition.remove(regionInfo.getEncodedName()) != null) {
772         this.regionsInTransition.notifyAll();
773       }
774     }
775     // remove the region plan as well just in case.
776     clearRegionPlan(regionInfo);
777     setOffline(regionInfo);
778   }
779 
780   /**
781    * Sets the region as offline by removing in-memory assignment information but
782    * retaining transition information.
783    * <p>
784    * Used when a region has been closed but should be reassigned.
785    * @param regionInfo
786    */
787   public void setOffline(HRegionInfo regionInfo) {
788     synchronized (this.regions) {
789       HServerInfo serverInfo = this.regions.remove(regionInfo);
790       if (serverInfo == null) return;
791       Set<HRegionInfo> serverRegions = this.servers.get(serverInfo);
792       if (!serverRegions.remove(regionInfo)) {
793         LOG.warn("No " + regionInfo + " on " + serverInfo);
794       }
795     }
796   }
797 
798   public void offlineDisabledRegion(HRegionInfo regionInfo) {
799     // Disabling so should not be reassigned, just delete the CLOSED node
800     LOG.debug("Table being disabled so deleting ZK node and removing from " +
801         "regions in transition, skipping assignment of region " +
802           regionInfo.getRegionNameAsString());
803     try {
804       if (!ZKAssign.deleteClosedNode(watcher, regionInfo.getEncodedName())) {
805         // Could also be in OFFLINE mode
806         ZKAssign.deleteOfflineNode(watcher, regionInfo.getEncodedName());
807       }
808     } catch (KeeperException.NoNodeException nne) {
809       LOG.debug("Tried to delete closed node for " + regionInfo + " but it " +
810           "does not exist so just offlining");
811     } catch (KeeperException e) {
812       this.master.abort("Error deleting CLOSED node in ZK", e);
813     }
814     regionOffline(regionInfo);
815   }
816 
817   // Assignment methods
818 
819   /**
820    * Assigns the specified region.
821    * <p>
822    * If a RegionPlan is available with a valid destination then it will be used
823    * to determine what server region is assigned to.  If no RegionPlan is
824    * available, region will be assigned to a random available server.
825    * <p>
826    * Updates the RegionState and sends the OPEN RPC.
827    * <p>
828    * This will only succeed if the region is in transition and in a CLOSED or
829    * OFFLINE state or not in transition (in-memory not zk), and of course, the
830    * chosen server is up and running (It may have just crashed!).  If the
831    * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
832    *
833    * @param region server to be assigned
834    * @param setOfflineInZK whether ZK node should be created/transitioned to an
835    *                       OFFLINE state before assigning the region
836    */
837   public void assign(HRegionInfo region, boolean setOfflineInZK) {
838     assign(region, setOfflineInZK, false);
839   }
840 
841   public void assign(HRegionInfo region, boolean setOfflineInZK,
842       boolean forceNewPlan) {
843     String tableName = region.getTableDesc().getNameAsString();
844     boolean disabled = this.zkTable.isDisabledTable(tableName);
845     if (disabled || this.zkTable.isDisablingTable(tableName)) {
846       LOG.info("Table " + tableName + (disabled? " disabled;": " disabling;") +
847         " skipping assign of " + region.getRegionNameAsString());
848       offlineDisabledRegion(region);
849       return;
850     }
851     if (this.serverManager.isClusterShutdown()) {
852       LOG.info("Cluster shutdown is set; skipping assign of " +
853         region.getRegionNameAsString());
854       return;
855     }
856     RegionState state = addToRegionsInTransition(region);
857     synchronized (state) {
858       assign(state, setOfflineInZK, forceNewPlan);
859     }
860   }
861 
862   /**
863    * Bulk assign regions to <code>destination</code>.
864    * @param destination
865    * @param regions Regions to assign.
866    */
867   void assign(final HServerInfo destination,
868       final List<HRegionInfo> regions) {
869     LOG.debug("Bulk assigning " + regions.size() + " region(s) to " +
870       destination.getServerName());
871 
872     List<RegionState> states = new ArrayList<RegionState>(regions.size());
873     synchronized (this.regionsInTransition) {
874       for (HRegionInfo region: regions) {
875         states.add(forceRegionStateToOffline(region));
876       }
877     }
878     // Presumption is that only this thread will be updating the state at this
879     // time; i.e. handlers on backend won't be trying to set it to OPEN, etc.
880     AtomicInteger counter = new AtomicInteger(0);
881     CreateUnassignedAsyncCallback cb =
882       new CreateUnassignedAsyncCallback(this.watcher, destination, counter);
883     for (RegionState state: states) {
884       if (!asyncSetOfflineInZooKeeper(state, cb, state)) {
885         return;
886       }
887     }
888     // Wait until all unassigned nodes have been put up and watchers set.
889     int total = regions.size();
890     for (int oldCounter = 0; true;) {
891       int count = counter.get();
892       if (oldCounter != count) {
893         LOG.info(destination.getServerName() + " unassigned znodes=" + count +
894           " of total=" + total);
895         oldCounter = count;
896       }
897       if (count == total) break;
898       Threads.sleep(1);
899     }
900     try {
901       long maxWaitTime = System.currentTimeMillis() +
902         this.master.getConfiguration().
903           getLong("hbase.regionserver.rpc.startup.waittime", 60000);
904       while (!this.master.isStopped()) {
905         try {
906           this.serverManager.sendRegionOpen(destination, regions);
907           break;
908         } catch (org.apache.hadoop.hbase.ipc.ServerNotRunningException e) {
909           // This is the one exception to retry.  For all else we should just fail
910           // the startup.
911           long now = System.currentTimeMillis();
912           if (now > maxWaitTime) throw e;
913           LOG.debug("Server is not yet up; waiting up to " +
914               (maxWaitTime - now) + "ms", e);
915           Thread.sleep(1000);
916         }
917       }
918     } catch (IOException e) {
919       throw new RuntimeException(e);
920     } catch (InterruptedException e) {
921       throw new RuntimeException(e);
922     }
923     LOG.debug("Bulk assigning done for " + destination.getServerName());
924   }
925 
926   /**
927    * Callback handler for create unassigned znodes used during bulk assign.
928    */
929   static class CreateUnassignedAsyncCallback implements AsyncCallback.StringCallback {
930     private final Log LOG = LogFactory.getLog(CreateUnassignedAsyncCallback.class);
931     private final ZooKeeperWatcher zkw;
932     private final HServerInfo destination;
933     private final AtomicInteger counter;
934 
935     CreateUnassignedAsyncCallback(final ZooKeeperWatcher zkw,
936         final HServerInfo destination, final AtomicInteger counter) {
937       this.zkw = zkw;
938       this.destination = destination;
939       this.counter = counter;
940     }
941 
942     @Override
943     public void processResult(int rc, String path, Object ctx, String name) {
944       if (rc != 0) {
945         // Thisis resultcode.  If non-zero, need to resubmit.
946         LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
947           "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
948         this.zkw.abort("Connectionloss writing unassigned at " + path +
949           ", rc=" + rc, null);
950         return;
951       }
952       LOG.debug("rs=" + (RegionState)ctx + ", server=" + this.destination.getServerName());
953       // Async exists to set a watcher so we'll get triggered when
954       // unassigned node changes.
955       this.zkw.getZooKeeper().exists(path, this.zkw,
956         new ExistsUnassignedAsyncCallback(this.counter), ctx);
957     }
958   }
959 
960   /**
961    * Callback handler for the exists call that sets watcher on unassigned znodes.
962    * Used during bulk assign on startup.
963    */
964   static class ExistsUnassignedAsyncCallback implements AsyncCallback.StatCallback {
965     private final Log LOG = LogFactory.getLog(ExistsUnassignedAsyncCallback.class);
966     private final AtomicInteger counter;
967 
968     ExistsUnassignedAsyncCallback(final AtomicInteger counter) {
969       this.counter = counter;
970     }
971 
972     @Override
973     public void processResult(int rc, String path, Object ctx, Stat stat) {
974       if (rc != 0) {
975         // Thisis resultcode.  If non-zero, need to resubmit.
976         LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
977           "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
978         return;
979       }
980       RegionState state = (RegionState)ctx;
981       LOG.debug("rs=" + state);
982       // Transition RegionState to PENDING_OPEN here in master; means we've
983       // sent the open.  We're a little ahead of ourselves here since we've not
984       // yet sent out the actual open but putting this state change after the
985       // call to open risks our writing PENDING_OPEN after state has been moved
986       // to OPENING by the regionserver.
987       state.update(RegionState.State.PENDING_OPEN);
988       this.counter.addAndGet(1);
989     }
990   }
991 
992   /**
993    * @param region
994    * @return
995    */
996   private RegionState addToRegionsInTransition(final HRegionInfo region) {
997     synchronized (regionsInTransition) {
998       return forceRegionStateToOffline(region);
999     }
1000   }
1001 
1002   /**
1003    * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
1004    * Caller must hold lock on this.regionsInTransition.
1005    * @param region
1006    * @return Amended RegionState.
1007    */
1008   private RegionState forceRegionStateToOffline(final HRegionInfo region) {
1009     String encodedName = region.getEncodedName();
1010     RegionState state = this.regionsInTransition.get(encodedName);
1011     if (state == null) {
1012       state = new RegionState(region, RegionState.State.OFFLINE);
1013       this.regionsInTransition.put(encodedName, state);
1014     } else {
1015       LOG.debug("Forcing OFFLINE; was=" + state);
1016       state.update(RegionState.State.OFFLINE);
1017     }
1018     return state;
1019   }
1020 
1021   /**
1022    * Caller must hold lock on the passed <code>state</code> object.
1023    * @param state
1024    * @param setOfflineInZK
1025    * @param forceNewPlan
1026    */
1027   private void assign(final RegionState state, final boolean setOfflineInZK,
1028       final boolean forceNewPlan) {
1029     for (int i = 0; i < this.maximumAssignmentAttempts; i++) {
1030       if (setOfflineInZK && !setOfflineInZooKeeper(state)) return;
1031       if (this.master.isStopped()) {
1032         LOG.debug("Server stopped; skipping assign of " + state);
1033         return;
1034       }
1035       RegionPlan plan = getRegionPlan(state, forceNewPlan);
1036       if (plan == null) return; // Should get reassigned later when RIT times out.
1037       try {
1038         LOG.debug("Assigning region " + state.getRegion().getRegionNameAsString() +
1039           " to " + plan.getDestination().getServerName());
1040         // Transition RegionState to PENDING_OPEN
1041         state.update(RegionState.State.PENDING_OPEN);
1042         // Send OPEN RPC. This can fail if the server on other end is is not up.
1043         serverManager.sendRegionOpen(plan.getDestination(), state.getRegion());
1044         break;
1045       } catch (Throwable t) {
1046         LOG.warn("Failed assignment of " +
1047           state.getRegion().getRegionNameAsString() + " to " +
1048           plan.getDestination() + ", trying to assign elsewhere instead; " +
1049           "retry=" + i, t);
1050         // Clean out plan we failed execute and one that doesn't look like it'll
1051         // succeed anyways; we need a new plan!
1052         // Transition back to OFFLINE
1053         state.update(RegionState.State.OFFLINE);
1054         // Force a new plan and reassign.  Will return n
1055         // ull if no servers.
1056         if (getRegionPlan(state, plan.getDestination(), true) == null) {
1057           LOG.warn("Unable to find a viable location to assign region " +
1058             state.getRegion().getRegionNameAsString());
1059           return;
1060         }
1061       }
1062     }
1063   }
1064 
1065   /**
1066    * Set region as OFFLINED up in zookeeper
1067    * @param state
1068    * @return True if we succeeded, false otherwise (State was incorrect or failed
1069    * updating zk).
1070    */
1071   boolean setOfflineInZooKeeper(final RegionState state) {
1072     if (!state.isClosed() && !state.isOffline()) {
1073         new RuntimeException("Unexpected state trying to OFFLINE; " + state);
1074       this.master.abort("Unexpected state trying to OFFLINE; " + state,
1075         new IllegalStateException());
1076       return false;
1077     }
1078     state.update(RegionState.State.OFFLINE);
1079     try {
1080       if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(),
1081           state.getRegion(), master.getServerName())) {
1082         LOG.warn("Attempted to create/force node into OFFLINE state before " +
1083           "completing assignment but failed to do so for " + state);
1084         return false;
1085       }
1086     } catch (KeeperException e) {
1087       master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
1088       return false;
1089     }
1090     return true;
1091   }
1092 
1093   /**
1094    * Set region as OFFLINED up in zookeeper asynchronously.
1095    * @param state
1096    * @return True if we succeeded, false otherwise (State was incorrect or failed
1097    * updating zk).
1098    */
1099   boolean asyncSetOfflineInZooKeeper(final RegionState state,
1100       final AsyncCallback.StringCallback cb, final Object ctx) {
1101     if (!state.isClosed() && !state.isOffline()) {
1102         new RuntimeException("Unexpected state trying to OFFLINE; " + state);
1103       this.master.abort("Unexpected state trying to OFFLINE; " + state,
1104         new IllegalStateException());
1105       return false;
1106     }
1107     state.update(RegionState.State.OFFLINE);
1108     try {
1109       ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
1110         master.getServerName(), cb, ctx);
1111     } catch (KeeperException e) {
1112       master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
1113       return false;
1114     }
1115     return true;
1116   }
1117 
1118   /**
1119    * @param state
1120    * @return Plan for passed <code>state</code> (If none currently, it creates one or
1121    * if no servers to assign, it returns null).
1122    */
1123   RegionPlan getRegionPlan(final RegionState state,
1124       final boolean forceNewPlan) {
1125     return getRegionPlan(state, null, forceNewPlan);
1126   }
1127 
1128   /**
1129    * @param state
1130    * @param serverToExclude Server to exclude (we know its bad). Pass null if
1131    * all servers are thought to be assignable.
1132    * @param forceNewPlan If true, then if an existing plan exists, a new plan
1133    * will be generated.
1134    * @return Plan for passed <code>state</code> (If none currently, it creates one or
1135    * if no servers to assign, it returns null).
1136    */
1137   RegionPlan getRegionPlan(final RegionState state,
1138       final HServerInfo serverToExclude, final boolean forceNewPlan) {
1139     // Pickup existing plan or make a new one
1140     String encodedName = state.getRegion().getEncodedName();
1141     List<HServerInfo> servers = this.serverManager.getOnlineServersList();
1142     // The remove below hinges on the fact that the call to
1143     // serverManager.getOnlineServersList() returns a copy
1144     if (serverToExclude != null) servers.remove(serverToExclude);
1145     if (servers.isEmpty()) return null;
1146     RegionPlan randomPlan = new RegionPlan(state.getRegion(), null,
1147       LoadBalancer.randomAssignment(servers));
1148     boolean newPlan = false;
1149     RegionPlan existingPlan = null;
1150     synchronized (this.regionPlans) {
1151       existingPlan = this.regionPlans.get(encodedName);
1152       if (forceNewPlan || existingPlan == null 
1153               || existingPlan.getDestination() == null 
1154               || existingPlan.getDestination().equals(serverToExclude)) {
1155         newPlan = true;
1156         this.regionPlans.put(encodedName, randomPlan);
1157       }
1158     }
1159     if (newPlan) {
1160       LOG.debug("No previous transition plan was found (or we are ignoring " +
1161         "an existing plan) for " + state.getRegion().getRegionNameAsString() +
1162         " so generated a random one; " + randomPlan + "; " +
1163         serverManager.countOfRegionServers() +
1164         " (online=" + serverManager.getOnlineServers().size() +
1165         ", exclude=" + serverToExclude + ") available servers");
1166         return randomPlan;
1167       }
1168       LOG.debug("Using pre-existing plan for region " +
1169         state.getRegion().getRegionNameAsString() + "; plan=" + existingPlan);
1170       return existingPlan;
1171   }
1172 
1173   /**
1174    * Unassigns the specified region.
1175    * <p>
1176    * Updates the RegionState and sends the CLOSE RPC.
1177    * <p>
1178    * If a RegionPlan is already set, it will remain.
1179    *
1180    * @param region server to be unassigned
1181    */
1182   public void unassign(HRegionInfo region) {
1183     unassign(region, false);
1184   }
1185 
1186   /**
1187    * Unassigns the specified region.
1188    * <p>
1189    * Updates the RegionState and sends the CLOSE RPC.
1190    * <p>
1191    * If a RegionPlan is already set, it will remain.
1192    *
1193    * @param region server to be unassigned
1194    * @param force if region should be closed even if already closing
1195    */
1196   public void unassign(HRegionInfo region, boolean force) {
1197     LOG.debug("Starting unassignment of region " +
1198       region.getRegionNameAsString() + " (offlining)");
1199     synchronized (this.regions) {
1200       // Check if this region is currently assigned
1201       if (!regions.containsKey(region)) {
1202         LOG.debug("Attempted to unassign region " +
1203           region.getRegionNameAsString() + " but it is not " +
1204           "currently assigned anywhere");
1205         return;
1206       }
1207     }
1208     String encodedName = region.getEncodedName();
1209     // Grab the state of this region and synchronize on it
1210     RegionState state;
1211     synchronized (regionsInTransition) {
1212       state = regionsInTransition.get(encodedName);
1213       if (state == null) {
1214         state = new RegionState(region, RegionState.State.PENDING_CLOSE);
1215         regionsInTransition.put(encodedName, state);
1216       } else if (force && state.isPendingClose()) {
1217         LOG.debug("Attempting to unassign region " +
1218             region.getRegionNameAsString() + " which is already pending close "
1219             + "but forcing an additional close");
1220         state.update(RegionState.State.PENDING_CLOSE);
1221       } else {
1222         LOG.debug("Attempting to unassign region " +
1223           region.getRegionNameAsString() + " but it is " +
1224           "already in transition (" + state.getState() + ")");
1225         return;
1226       }
1227     }
1228     // Send CLOSE RPC
1229     HServerInfo server = null;
1230     synchronized (this.regions) {
1231       server = regions.get(region);
1232     }
1233     try {
1234       if (serverManager.sendRegionClose(server, state.getRegion())) {
1235         LOG.debug("Sent CLOSE to " + server + " for region " +
1236           region.getRegionNameAsString());
1237         return;
1238       }
1239       // This never happens. Currently regionserver close always return true.
1240       LOG.debug("Server " + server + " region CLOSE RPC returned false for " +
1241         region.getEncodedName());
1242     } catch (NotServingRegionException nsre) {
1243       LOG.info("Server " + server + " returned " + nsre + " for " +
1244         region.getEncodedName());
1245       // Presume that master has stale data.  Presume remote side just split.
1246       // Presume that the split message when it comes in will fix up the master's
1247       // in memory cluster state.
1248       return;
1249     } catch (Throwable t) {
1250       if (t instanceof RemoteException) {
1251         t = ((RemoteException)t).unwrapRemoteException();
1252         if (t instanceof NotServingRegionException) {
1253           if (checkIfRegionBelongsToDisabling(region)) {
1254             // Remove from the regionsinTransition map
1255             LOG.info("While trying to recover the table "
1256                 + region.getTableDesc().getNameAsString()
1257                 + " to DISABLED state the region " + region
1258                 + " was offlined but the table was in DISABLING state");
1259             synchronized (this.regionsInTransition) {
1260               this.regionsInTransition.remove(region.getEncodedName());
1261             }
1262             // Remove from the regionsMap
1263             synchronized (this.regions) {
1264               this.regions.remove(region);
1265             }
1266           }
1267         }
1268       }
1269       LOG.info("Server " + server + " returned " + t + " for " +
1270         region.getEncodedName());
1271       // Presume retry or server will expire.
1272     }
1273   }
1274 
1275   /**
1276    * Waits until the specified region has completed assignment.
1277    * <p>
1278    * If the region is already assigned, returns immediately.  Otherwise, method
1279    * blocks until the region is assigned.
1280    * @param regionInfo region to wait on assignment for
1281    * @throws InterruptedException
1282    */
1283   public void waitForAssignment(HRegionInfo regionInfo)
1284   throws InterruptedException {
1285     synchronized(regions) {
1286       while(!regions.containsKey(regionInfo)) {
1287         regions.wait();
1288       }
1289     }
1290   }
1291 
1292   /**
1293    * Assigns the ROOT region.
1294    * <p>
1295    * Assumes that ROOT is currently closed and is not being actively served by
1296    * any RegionServer.
1297    * <p>
1298    * Forcibly unsets the current root region location in ZooKeeper and assigns
1299    * ROOT to a random RegionServer.
1300    * @throws KeeperException
1301    */
1302   public void assignRoot() throws KeeperException {
1303     RootLocationEditor.deleteRootLocation(this.master.getZooKeeper());
1304     assign(HRegionInfo.ROOT_REGIONINFO, true);
1305   }
1306 
1307   /**
1308    * Assigns the META region.
1309    * <p>
1310    * Assumes that META is currently closed and is not being actively served by
1311    * any RegionServer.
1312    * <p>
1313    * Forcibly assigns META to a random RegionServer.
1314    */
1315   public void assignMeta() {
1316     // Force assignment to a random server
1317     assign(HRegionInfo.FIRST_META_REGIONINFO, true);
1318   }
1319 
1320   /**
1321    * Assigns all user regions, if any.  Used during cluster startup.
1322    * <p>
1323    * This is a synchronous call and will return once every region has been
1324    * assigned.  If anything fails, an exception is thrown and the cluster
1325    * should be shutdown.
1326    * @throws InterruptedException
1327    * @throws IOException
1328    */
1329   public void assignAllUserRegions() throws IOException, InterruptedException {
1330     // Get all available servers
1331     List<HServerInfo> servers = serverManager.getOnlineServersList();
1332 
1333     // Scan META for all user regions, skipping any disabled tables
1334     Map<HRegionInfo,HServerAddress> allRegions =
1335       MetaReader.fullScan(catalogTracker, this.zkTable.getDisabledTables(), true);
1336     if (allRegions == null || allRegions.isEmpty()) return;
1337 
1338     // Determine what type of assignment to do on startup
1339     boolean retainAssignment = master.getConfiguration().
1340       getBoolean("hbase.master.startup.retainassign", true);
1341 
1342     Map<HServerInfo, List<HRegionInfo>> bulkPlan = null;
1343     if (retainAssignment) {
1344       // Reuse existing assignment info
1345       bulkPlan = LoadBalancer.retainAssignment(allRegions, servers);
1346     } else {
1347       // assign regions in round-robin fashion
1348       bulkPlan = LoadBalancer.roundRobinAssignment(new ArrayList<HRegionInfo>(allRegions.keySet()), servers);
1349     }
1350     LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
1351       servers.size() + " server(s), retainAssignment=" + retainAssignment);
1352 
1353     // Use fixed count thread pool assigning.
1354     BulkAssigner ba = new StartupBulkAssigner(this.master, bulkPlan, this);
1355     ba.bulkAssign();
1356     LOG.info("Bulk assigning done");
1357   }
1358 
1359   /**
1360    * Run bulk assign on startup.  Does one RCP per regionserver passing a
1361    * batch of reginons using {@link SingleServerBulkAssigner}.
1362    * Uses default {@link #getUncaughtExceptionHandler()}
1363    * which will abort the Server if exception.
1364    */
1365   static class StartupBulkAssigner extends BulkAssigner {
1366     final Map<HServerInfo, List<HRegionInfo>> bulkPlan;
1367     final AssignmentManager assignmentManager;
1368 
1369     StartupBulkAssigner(final Server server,
1370         final Map<HServerInfo, List<HRegionInfo>> bulkPlan,
1371         final AssignmentManager am) {
1372       super(server);
1373       this.bulkPlan = bulkPlan;
1374       this.assignmentManager = am;
1375     }
1376 
1377     @Override
1378     public boolean bulkAssign(boolean sync) throws InterruptedException {
1379       // Disable timing out regions in transition up in zk while bulk assigning.
1380       this.assignmentManager.timeoutMonitor.bulkAssign(true);
1381       try {
1382         return super.bulkAssign(sync);
1383       } finally {
1384         // Reenable timing out regions in transition up in zi.
1385         this.assignmentManager.timeoutMonitor.bulkAssign(false);
1386       }
1387     }
1388 
1389     @Override
1390     protected String getThreadNamePrefix() {
1391       return this.server.getServerName() + "-StartupBulkAssigner";
1392     }
1393 
1394     @Override
1395     protected void populatePool(java.util.concurrent.ExecutorService pool) {
1396       for (Map.Entry<HServerInfo, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
1397         pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
1398           this.assignmentManager, true));
1399       }
1400     }
1401 
1402     protected boolean waitUntilDone(final long timeout)
1403     throws InterruptedException {
1404       Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
1405       for (List<HRegionInfo> regionList : bulkPlan.values()) {
1406         regionSet.addAll(regionList);
1407       }
1408       return this.assignmentManager.waitUntilNoRegionsInTransition(timeout, regionSet);
1409     }
1410 
1411     @Override
1412     protected long getTimeoutOnRIT() {
1413       // Guess timeout.  Multiply the number of regions on a random server
1414       // by how long we thing one region takes opening.
1415       long perRegionOpenTimeGuesstimate =
1416         this.server.getConfiguration().getLong("hbase.bulk.assignment.perregion.open.time", 1000);
1417       int regionsPerServer =
1418         this.bulkPlan.entrySet().iterator().next().getValue().size();
1419       long timeout = perRegionOpenTimeGuesstimate * regionsPerServer;
1420       LOG.debug("Timeout-on-RIT=" + timeout);
1421       return timeout;
1422     }
1423   }
1424 
1425   /**
1426    * Bulk user region assigner.
1427    * If failed assign, lets timeout in RIT do cleanup.
1428    */
1429   static class GeneralBulkAssigner extends StartupBulkAssigner {
1430     GeneralBulkAssigner(final Server server,
1431         final Map<HServerInfo, List<HRegionInfo>> bulkPlan,
1432         final AssignmentManager am) {
1433       super(server, bulkPlan, am);
1434     }
1435 
1436     @Override
1437     protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
1438       return new UncaughtExceptionHandler() {
1439         @Override
1440         public void uncaughtException(Thread t, Throwable e) {
1441           LOG.warn("Assigning regions in " + t.getName(), e);
1442         }
1443       };
1444     }
1445   }
1446 
1447   /**
1448    * Manage bulk assigning to a server.
1449    */
1450   static class SingleServerBulkAssigner implements Runnable {
1451     private final HServerInfo regionserver;
1452     private final List<HRegionInfo> regions;
1453     private final AssignmentManager assignmentManager;
1454 
1455     SingleServerBulkAssigner(final HServerInfo regionserver,
1456         final List<HRegionInfo> regions, final AssignmentManager am,
1457         final boolean startUp) {
1458       this.regionserver = regionserver;
1459       this.regions = regions;
1460       this.assignmentManager = am;
1461     }
1462     @Override
1463     public void run() {
1464       this.assignmentManager.assign(this.regionserver, this.regions);
1465     }
1466   }
1467 
1468   /**
1469    * Wait until no regions in transition.
1470    * @param timeout How long to wait.
1471    * @return True if nothing in regions in transition.
1472    * @throws InterruptedException
1473    */
1474   boolean waitUntilNoRegionsInTransition(final long timeout)
1475   throws InterruptedException {
1476     // Blocks until there are no regions in transition. It is possible that
1477     // there
1478     // are regions in transition immediately after this returns but guarantees
1479     // that if it returns without an exception that there was a period of time
1480     // with no regions in transition from the point-of-view of the in-memory
1481     // state of the Master.
1482     long startTime = System.currentTimeMillis();
1483     long remaining = timeout;
1484     synchronized (regionsInTransition) {
1485       while (regionsInTransition.size() > 0 && !this.master.isStopped()
1486           && remaining > 0) {
1487         regionsInTransition.wait(remaining);
1488         remaining = timeout - (System.currentTimeMillis() - startTime);
1489       }
1490     }
1491     return regionsInTransition.isEmpty();
1492   }
1493 
1494   /**
1495    * Wait until no regions from set regions are in transition.
1496    * @param timeout How long to wait.
1497    * @param regions set of regions to wait for
1498    * @return True if nothing in regions in transition.
1499    * @throws InterruptedException
1500    */
1501   boolean waitUntilNoRegionsInTransition(final long timeout, Set<HRegionInfo> regions)
1502   throws InterruptedException {
1503     // Blocks until there are no regions in transition.
1504     long startTime = System.currentTimeMillis();
1505     long remaining = timeout;
1506     boolean stillInTransition = true;
1507     synchronized (regionsInTransition) {
1508       while (regionsInTransition.size() > 0 && !this.master.isStopped() &&
1509           remaining > 0 && stillInTransition) {
1510         int count = 0;
1511         for (RegionState rs : regionsInTransition.values()) {
1512           if (regions.contains(rs.getRegion())) {
1513             count++;
1514             break;
1515           }
1516         }
1517         if (count == 0) {
1518           stillInTransition = false;
1519           break;
1520         }
1521         regionsInTransition.wait(remaining);
1522         remaining = timeout - (System.currentTimeMillis() - startTime);
1523       }
1524     }
1525     return stillInTransition;
1526   }
1527 
1528   /**
1529    * Rebuild the list of user regions and assignment information.
1530    * <p>
1531    * Returns a map of servers that are not found to be online and the regions
1532    * they were hosting.
1533    * @return map of servers not online to their assigned regions, as stored
1534    *         in META
1535    * @throws IOException
1536  * @throws KeeperException 
1537    */
1538   private Map<String, List<Pair<HRegionInfo,Result>>> rebuildUserRegions()
1539   throws IOException, KeeperException {
1540     // Region assignment from META
1541     List<Result> results = MetaReader.fullScanOfResults(catalogTracker);
1542     // Map of offline servers and their regions to be returned
1543     Map<String, List<Pair<HRegionInfo,Result>>> offlineServers =
1544       new TreeMap<String, List<Pair<HRegionInfo,Result>>>();
1545     // store all the disabling state table names
1546     Set<String> disablingTables = new HashSet<String>(1);
1547     // Iterate regions in META
1548     for (Result result : results) {
1549       Pair<HRegionInfo,HServerInfo> region =
1550         MetaReader.metaRowToRegionPairWithInfo(result);
1551       if (region == null) continue;
1552       HServerInfo regionLocation = region.getSecond();
1553       HRegionInfo regionInfo = region.getFirst();
1554       String disablingTableName = regionInfo.getTableDesc().getNameAsString();
1555       if (regionLocation == null) {
1556         // Region not being served, add to region map with no assignment
1557         // If this needs to be assigned out, it will also be in ZK as RIT
1558         // add if the table is not in disabled state
1559         if (false == checkIfRegionBelongsToDisabled(regionInfo)) {
1560           this.regions.put(regionInfo, null);
1561         }
1562         if (checkIfRegionBelongsToDisabling(regionInfo)) {
1563           disablingTables.add(disablingTableName);
1564         }
1565       } else if (!serverManager.isServerOnline(regionLocation.getServerName())) {
1566         // Region is located on a server that isn't online
1567         List<Pair<HRegionInfo,Result>> offlineRegions =
1568           offlineServers.get(regionLocation.getServerName());
1569         if (offlineRegions == null) {
1570           offlineRegions = new ArrayList<Pair<HRegionInfo,Result>>(1);
1571           offlineServers.put(regionLocation.getServerName(), offlineRegions);
1572         }
1573         offlineRegions.add(new Pair<HRegionInfo,Result>(regionInfo, result));
1574       } else {
1575         // Region is being served and on an active server
1576         // add only if region not in disabled table
1577         if (false == checkIfRegionBelongsToDisabled(regionInfo)) {
1578           regions.put(regionInfo, regionLocation);
1579           addToServers(regionLocation, regionInfo);
1580         }
1581         if (checkIfRegionBelongsToDisabling(regionInfo)) {
1582           disablingTables.add(disablingTableName);
1583         }
1584       }
1585     }
1586     // Recover the tables that were not fully moved to DISABLED state.
1587     // These tables are in DISABLING state when the master
1588     // restarted/switched.
1589     if (disablingTables.size() != 0) {
1590       // Create a watcher on the zookeeper node
1591       ZKUtil.listChildrenAndWatchForNewChildren(watcher,
1592           watcher.assignmentZNode);
1593       for (String tableName : disablingTables) {
1594         // Recover by calling DisableTableHandler
1595         LOG.info("The table " + tableName
1596             + " is in DISABLING state.  Hence recovering by moving the table"
1597             + " to DISABLED state.");
1598         new DisableTableHandler(this.master, tableName.getBytes(),
1599             catalogTracker, this).process();
1600       }
1601     }
1602     return offlineServers;
1603   }
1604   
1605   private boolean checkIfRegionBelongsToDisabled(HRegionInfo regionInfo) {
1606     String tableName = regionInfo.getTableDesc().getNameAsString();
1607     return getZKTable().isDisabledTable(tableName);
1608   }
1609 
1610   private boolean checkIfRegionBelongsToDisabling(HRegionInfo regionInfo) {
1611     String tableName = regionInfo.getTableDesc().getNameAsString();
1612     return getZKTable().isDisablingTable(tableName);
1613   }
1614 
1615   /**
1616    * Processes list of dead servers from result of META scan.
1617    * <p>
1618    * This is used as part of failover to handle RegionServers which failed
1619    * while there was no active master.
1620    * <p>
1621    * Method stubs in-memory data to be as expected by the normal server shutdown
1622    * handler.
1623    *
1624    * @param deadServers
1625    * @throws IOException
1626    * @throws KeeperException
1627    */
1628   private void processDeadServers(
1629       Map<String, List<Pair<HRegionInfo, Result>>> deadServers)
1630   throws IOException, KeeperException {
1631     for (Map.Entry<String, List<Pair<HRegionInfo,Result>>> deadServer :
1632       deadServers.entrySet()) {
1633       List<Pair<HRegionInfo,Result>> regions = deadServer.getValue();
1634       for (Pair<HRegionInfo,Result> region : regions) {
1635         HRegionInfo regionInfo = region.getFirst();
1636         Result result = region.getSecond();
1637         // If region was in transition (was in zk) force it offline for reassign
1638         try {
1639           //Process with existing RS shutdown code  
1640           boolean assign =
1641             ServerShutdownHandler.processDeadRegion(regionInfo, result, this,
1642               this.catalogTracker);
1643           if (assign) {
1644             ZKAssign.createOrForceNodeOffline(watcher, regionInfo,
1645               master.getServerName()); 
1646           }
1647         } catch (KeeperException.NoNodeException nne) {
1648           // This is fine
1649         }
1650       }
1651     }
1652   }
1653 
1654   /*
1655    * Presumes caller has taken care of necessary locking modifying servers Map.
1656    * @param hsi
1657    * @param hri
1658    */
1659   private void addToServers(final HServerInfo hsi, final HRegionInfo hri) {
1660     Set<HRegionInfo> hris = servers.get(hsi);
1661     if (hris == null) {
1662       hris = new ConcurrentSkipListSet<HRegionInfo>();
1663       servers.put(hsi, hris);
1664     }
1665     if (!hris.contains(hri)) hris.add(hri);
1666   }
1667 
1668   /**
1669    * @return A copy of the Map of regions currently in transition.
1670    */
1671   public NavigableMap<String, RegionState> getRegionsInTransition() {
1672     synchronized (this.regionsInTransition) {
1673       return new TreeMap<String, RegionState>(this.regionsInTransition);
1674     }
1675   }
1676 
1677   /**
1678    * @return True if regions in transition.
1679    */
1680   public boolean isRegionsInTransition() {
1681     synchronized (this.regionsInTransition) {
1682       return !this.regionsInTransition.isEmpty();
1683     }
1684   }
1685 
1686   /**
1687    * @param hri Region to check.
1688    * @return Returns null if passed region is not in transition else the current
1689    * RegionState
1690    */
1691   public RegionState isRegionInTransition(final HRegionInfo hri) {
1692     synchronized (this.regionsInTransition) {
1693       return this.regionsInTransition.get(hri.getEncodedName());
1694     }
1695   }
1696 
1697   /**
1698    * Clears the specified region from being in transition.
1699    * <p>
1700    * Used only by HBCK tool.
1701    * @param hri
1702    */
1703   public void clearRegionFromTransition(HRegionInfo hri) {
1704     synchronized (this.regionsInTransition) {
1705       this.regionsInTransition.remove(hri.getEncodedName());
1706     }
1707     synchronized (this.regions) {
1708       this.regions.remove(hri);
1709       for (Set<HRegionInfo> regions : this.servers.values()) {
1710         regions.remove(hri);
1711       }
1712     }
1713     clearRegionPlan(hri);
1714   }
1715 
1716   /**
1717    * @param region Region whose plan we are to clear.
1718    */
1719   void clearRegionPlan(final HRegionInfo region) {
1720     synchronized (this.regionPlans) {
1721       this.regionPlans.remove(region.getEncodedName());
1722     }
1723   }
1724 
1725   /**
1726    * Wait on region to clear regions-in-transition.
1727    * @param hri Region to wait on.
1728    * @throws IOException
1729    */
1730   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
1731   throws IOException {
1732     if (isRegionInTransition(hri) == null) return;
1733     RegionState rs = null;
1734     // There is already a timeout monitor on regions in transition so I
1735     // should not have to have one here too?
1736     while(!this.master.isStopped() && (rs = isRegionInTransition(hri)) != null) {
1737       Threads.sleep(1000);
1738       LOG.info("Waiting on " + rs + " to clear regions-in-transition");
1739     }
1740     if (this.master.isStopped()) {
1741       LOG.info("Giving up wait on regions in " +
1742         "transition because stoppable.isStopped is set");
1743     }
1744   }
1745 
1746 
1747   /**
1748    * Gets the online regions of the specified table.
1749    * This method looks at the in-memory state.  It does not go to <code>.META.</code>.
1750    * Only returns <em>online</em> regions.  If a region on this table has been
1751    * closed during a disable, etc., it will be included in the returned list.
1752    * So, the returned list may not necessarily be ALL regions in this table, its
1753    * all the ONLINE regions in the table.
1754    * @param tableName
1755    * @return Online regions from <code>tableName</code>
1756    */
1757   public List<HRegionInfo> getRegionsOfTable(byte[] tableName) {
1758     List<HRegionInfo> tableRegions = new ArrayList<HRegionInfo>();
1759     HRegionInfo boundary =
1760       new HRegionInfo(new HTableDescriptor(tableName), null, null);
1761     synchronized (this.regions) {
1762       for (HRegionInfo regionInfo: this.regions.tailMap(boundary).keySet()) {
1763         if(Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
1764           tableRegions.add(regionInfo);
1765         } else {
1766           break;
1767         }
1768       }
1769     }
1770     return tableRegions;
1771   }
1772 
1773   /**
1774    * Monitor to check for time outs on region transition operations
1775    */
1776   public class TimeoutMonitor extends Chore {
1777     private final int timeout;
1778     private boolean bulkAssign = false;
1779 
1780     /**
1781      * Creates a periodic monitor to check for time outs on region transition
1782      * operations.  This will deal with retries if for some reason something
1783      * doesn't happen within the specified timeout.
1784      * @param period
1785    * @param stopper When {@link Stoppable#isStopped()} is true, this thread will
1786    * cleanup and exit cleanly.
1787      * @param timeout
1788      */
1789     public TimeoutMonitor(final int period, final Stoppable stopper,
1790         final int timeout) {
1791       super("AssignmentTimeoutMonitor", period, stopper);
1792       this.timeout = timeout;
1793     }
1794 
1795     /**
1796      * @param bulkAssign If true, we'll suspend checking regions in transition
1797      * up in zookeeper.  If false, will reenable check.
1798      * @return Old setting for bulkAssign.
1799      */
1800     public boolean bulkAssign(final boolean bulkAssign) {
1801       boolean result = this.bulkAssign;
1802       this.bulkAssign = bulkAssign;
1803       return result;
1804     }
1805 
1806     @Override
1807     protected void chore() {
1808       // If bulkAssign in progress, suspend checks
1809       if (this.bulkAssign) return;
1810       List<HRegionInfo> unassigns = new ArrayList<HRegionInfo>();
1811       Map<HRegionInfo, Boolean> assigns =
1812         new HashMap<HRegionInfo, Boolean>();
1813       synchronized (regionsInTransition) {
1814         // Iterate all regions in transition checking for time outs
1815         long now = System.currentTimeMillis();
1816         for (RegionState regionState : regionsInTransition.values()) {
1817           if (regionState.getStamp() + timeout <= now) {
1818             HRegionInfo regionInfo = regionState.getRegion();
1819             LOG.info("Regions in transition timed out:  " + regionState);
1820             // Expired!  Do a retry.
1821             switch (regionState.getState()) {
1822               case CLOSED:
1823                 LOG.info("Region " + regionInfo.getEncodedName() +
1824                   " has been CLOSED for too long, waiting on queued " +
1825                   "ClosedRegionHandler to run or server shutdown");
1826                 // Update our timestamp.
1827                 synchronized(regionState) {
1828                   regionState.update(regionState.getState());
1829                 }
1830                 break;
1831               case OFFLINE:
1832                 LOG.info("Region has been OFFLINE for too long, " +
1833                   "reassigning " + regionInfo.getRegionNameAsString() +
1834                   " to a random server");
1835                 assigns.put(regionState.getRegion(), Boolean.FALSE);
1836                 break;
1837               case PENDING_OPEN:
1838                 LOG.info("Region has been PENDING_OPEN for too " +
1839                     "long, reassigning region=" +
1840                     regionInfo.getRegionNameAsString());
1841                 assigns.put(regionState.getRegion(), Boolean.TRUE);
1842                 break;
1843               case OPENING:
1844                 LOG.info("Region has been OPENING for too " +
1845                   "long, reassigning region=" +
1846                   regionInfo.getRegionNameAsString());
1847                 // Should have a ZK node in OPENING state
1848                 try {
1849                   String node = ZKAssign.getNodeName(watcher,
1850                       regionInfo.getEncodedName());
1851                   Stat stat = new Stat();
1852                   RegionTransitionData data = ZKAssign.getDataNoWatch(watcher,
1853                       node, stat);
1854                   if (data == null) {
1855                     LOG.warn("Data is null, node " + node + " no longer exists");
1856                     break;
1857                   }
1858                   if (data.getEventType() == EventType.RS_ZK_REGION_OPENED) {
1859                     LOG.debug("Region has transitioned to OPENED, allowing " +
1860                         "watched event handlers to process");
1861                     break;
1862                   } else if (data.getEventType() !=
1863                       EventType.RS_ZK_REGION_OPENING) {
1864                     LOG.warn("While timing out a region in state OPENING, " +
1865                         "found ZK node in unexpected state: " +
1866                         data.getEventType());
1867                     break;
1868                   }
1869                   // Attempt to transition node into OFFLINE
1870                   try {
1871                     data = new RegionTransitionData(
1872                       EventType.M_ZK_REGION_OFFLINE, regionInfo.getRegionName(),
1873                       master.getServerName());
1874                     if (ZKUtil.setData(watcher, node, data.getBytes(),
1875                         stat.getVersion())) {
1876                       // Node is now OFFLINE, let's trigger another assignment
1877                       ZKUtil.getDataAndWatch(watcher, node); // re-set the watch
1878                       LOG.info("Successfully transitioned region=" +
1879                           regionInfo.getRegionNameAsString() + " into OFFLINE" +
1880                           " and forcing a new assignment");
1881                       assigns.put(regionState.getRegion(), Boolean.TRUE);
1882                     }
1883                   } catch (KeeperException.NoNodeException nne) {
1884                     // Node did not exist, can't time this out
1885                   }
1886                 } catch (KeeperException ke) {
1887                   LOG.error("Unexpected ZK exception timing out CLOSING region",
1888                       ke);
1889                   break;
1890                 }
1891                 break;
1892               case OPEN:
1893                 LOG.error("Region has been OPEN for too long, " +
1894                 "we don't know where region was opened so can't do anything");
1895                 synchronized(regionState) {
1896                   regionState.update(regionState.getState());
1897                 }
1898                 break;
1899 
1900               case PENDING_CLOSE:
1901                 LOG.info("Region has been PENDING_CLOSE for too " +
1902                     "long, running forced unassign again on region=" +
1903                     regionInfo.getRegionNameAsString());
1904                   try {
1905                     // If the server got the RPC, it will transition the node
1906                     // to CLOSING, so only do something here if no node exists
1907                     if (!ZKUtil.watchAndCheckExists(watcher,
1908                       ZKAssign.getNodeName(watcher, regionInfo.getEncodedName()))) {
1909                       // Queue running of an unassign -- do actual unassign
1910                       // outside of the regionsInTransition lock.
1911                       unassigns.add(regionInfo);
1912                     }
1913                   } catch (NoNodeException e) {
1914                     LOG.debug("Node no longer existed so not forcing another " +
1915                       "unassignment");
1916                   } catch (KeeperException e) {
1917                     LOG.warn("Unexpected ZK exception timing out a region " +
1918                       "close", e);
1919                   }
1920                   break;
1921               case CLOSING:
1922                 LOG.info("Region has been CLOSING for too " +
1923                   "long, this should eventually complete or the server will " +
1924                   "expire, doing nothing");
1925                 break;
1926             }
1927           }
1928         }
1929       }
1930       // Finish the work for regions in PENDING_CLOSE state
1931       for (HRegionInfo hri: unassigns) {
1932         unassign(hri, true);
1933       }
1934       for (Map.Entry<HRegionInfo, Boolean> e: assigns.entrySet()){
1935         assign(e.getKey(), false, e.getValue());
1936       }
1937     }
1938   }
1939 
1940   /**
1941    * Process shutdown server removing any assignments.
1942    * @param hsi Server that went down.
1943    * @return list of regions in transition on this server
1944    */
1945   public List<RegionState> processServerShutdown(final HServerInfo hsi) {
1946     // Clean out any existing assignment plans for this server
1947     synchronized (this.regionPlans) {
1948       for (Iterator <Map.Entry<String, RegionPlan>> i =
1949           this.regionPlans.entrySet().iterator(); i.hasNext();) {
1950         Map.Entry<String, RegionPlan> e = i.next();
1951         HServerInfo otherHsi = e.getValue().getDestination();
1952         // The HSI will be null if the region is planned for a random assign.
1953         if (otherHsi != null && otherHsi.equals(hsi)) {
1954           // Use iterator's remove else we'll get CME
1955           i.remove();
1956         }
1957       }
1958     }
1959     // TODO: Do we want to sync on RIT here?
1960     // Remove this server from map of servers to regions, and remove all regions
1961     // of this server from online map of regions.
1962     Set<HRegionInfo> deadRegions = null;
1963     List<RegionState> rits = new ArrayList<RegionState>();
1964     synchronized (this.regions) {
1965       Set<HRegionInfo> assignedRegions = this.servers.remove(hsi);
1966       if (assignedRegions == null || assignedRegions.isEmpty()) {
1967         // No regions on this server, we are done, return empty list of RITs
1968         return rits;
1969       }
1970       deadRegions = new TreeSet<HRegionInfo>(assignedRegions);
1971       for (HRegionInfo region : deadRegions) {
1972         this.regions.remove(region);
1973       }
1974     }
1975     // See if any of the regions that were online on this server were in RIT
1976     // If they are, normal timeouts will deal with them appropriately so
1977     // let's skip a manual re-assignment.
1978     synchronized (regionsInTransition) {
1979       for (RegionState region : this.regionsInTransition.values()) {
1980         if (deadRegions.remove(region.getRegion())) {
1981           rits.add(region);
1982         }
1983       }
1984     }
1985     return rits;
1986   }
1987 
1988   /**
1989    * Update inmemory structures.
1990    * @param hsi Server that reported the split
1991    * @param parent Parent region that was split
1992    * @param a Daughter region A
1993    * @param b Daughter region B
1994    */
1995   public void handleSplitReport(final HServerInfo hsi, final HRegionInfo parent,
1996       final HRegionInfo a, final HRegionInfo b) {
1997     regionOffline(parent);
1998     // Remove any CLOSING node, if exists, due to race between master & rs
1999     // for close & split.  Not putting into regionOffline method because it is
2000     // called from various locations.
2001     try {
2002       RegionTransitionData node = ZKAssign.getDataNoWatch(this.watcher,
2003         parent.getEncodedName(), null);
2004       if (node != null) {
2005         if (node.getEventType().equals(EventType.RS_ZK_REGION_CLOSING)) {
2006           ZKAssign.deleteClosingNode(this.watcher, parent);
2007         } else {
2008           LOG.warn("Split report has RIT node (shouldnt have one): " +
2009             parent + " node: " + node);
2010         }
2011       }
2012     } catch (KeeperException e) {
2013       LOG.warn("Exception while validating RIT during split report", e);
2014     }
2015     synchronized (this.regions) {         
2016       //one daughter is already online, do nothing
2017       HServerInfo hsia = this.regions.get(a);
2018       if (hsia != null){
2019         LOG.warn("Trying to process the split of " +a.getEncodedName()+ ", " +
2020           "but it was already done and one daughter is on region server " + hsia);
2021         return;
2022       }
2023     }
2024 
2025     regionOnline(a, hsi);
2026     regionOnline(b, hsi);
2027 
2028     // There's a possibility that the region was splitting while a user asked
2029     // the master to disable, we need to make sure we close those regions in
2030     // that case. This is not racing with the region server itself since RS
2031     // report is done after the split transaction completed.
2032     if (this.zkTable.isDisablingOrDisabledTable(
2033         parent.getTableDesc().getNameAsString())) {
2034       unassign(a);
2035       unassign(b);
2036     }
2037   }
2038 
2039   /**
2040    * @return A clone of current assignments. Note, this is assignments only.
2041    * If a new server has come in and it has no regions, it will not be included
2042    * in the returned Map.
2043    */
2044   Map<HServerInfo, List<HRegionInfo>> getAssignments() {
2045     // This is an EXPENSIVE clone.  Cloning though is the safest thing to do.
2046     // Can't let out original since it can change and at least the loadbalancer
2047     // wants to iterate this exported list.  We need to synchronize on regions
2048     // since all access to this.servers is under a lock on this.regions.
2049     Map<HServerInfo, List<HRegionInfo>> result = null;
2050     synchronized (this.regions) {
2051       result = new HashMap<HServerInfo, List<HRegionInfo>>(this.servers.size());
2052       for (Map.Entry<HServerInfo, Set<HRegionInfo>> e: this.servers.entrySet()) {
2053         List<HRegionInfo> shallowCopy = new ArrayList<HRegionInfo>(e.getValue());
2054         HServerInfo clone = new HServerInfo(e.getKey());
2055         // Set into server load the number of regions this server is carrying
2056         // The load balancer calculation needs it at least and its handy.
2057         clone.getLoad().setNumberOfRegions(e.getValue().size());
2058         result.put(clone, shallowCopy);
2059       }
2060     }
2061     return result;
2062   }
2063 
2064   /**
2065    * @param encodedRegionName Region encoded name.
2066    * @return Null or a {@link Pair} instance that holds the full {@link HRegionInfo}
2067    * and the hosting servers {@link HServerInfo}.
2068    */
2069   Pair<HRegionInfo, HServerInfo> getAssignment(final byte [] encodedRegionName) {
2070     String name = Bytes.toString(encodedRegionName);
2071     synchronized(this.regions) {
2072       for (Map.Entry<HRegionInfo, HServerInfo> e: this.regions.entrySet()) {
2073         if (e.getKey().getEncodedName().equals(name)) {
2074           return new Pair<HRegionInfo, HServerInfo>(e.getKey(), e.getValue());
2075         }
2076       }
2077     }
2078     return null;
2079   }
2080 
2081   /**
2082    * @param plan Plan to execute.
2083    */
2084   void balance(final RegionPlan plan) {
2085     synchronized (this.regionPlans) {
2086       this.regionPlans.put(plan.getRegionName(), plan);
2087     }
2088     unassign(plan.getRegionInfo());
2089   }
2090 
2091   /**
2092    * Assigns list of user regions in round-robin fashion, if any.
2093    * @param sync True if we are to wait on all assigns.
2094    * @param startup True if this is server startup time.
2095    * @throws InterruptedException
2096    * @throws IOException
2097    */
2098   void bulkAssignUserRegions(final HRegionInfo [] regions,
2099       final List<HServerInfo> servers, final boolean sync)
2100   throws IOException {
2101     Map<HServerInfo, List<HRegionInfo>> bulkPlan =
2102       LoadBalancer.roundRobinAssignment(java.util.Arrays.asList(regions), servers);
2103     LOG.info("Bulk assigning " + regions.length + " region(s) " +
2104       "round-robin across " + servers.size() + " server(s)");
2105     // Use fixed count thread pool assigning.
2106     BulkAssigner ba = new GeneralBulkAssigner(this.master, bulkPlan, this);
2107     try {
2108       ba.bulkAssign(sync);
2109     } catch (InterruptedException e) {
2110       throw new IOException("InterruptedException bulk assigning", e);
2111     }
2112     LOG.info("Bulk assigning done");
2113   }
2114 
2115   /**
2116    * State of a Region while undergoing transitions.
2117    */
2118   public static class RegionState implements Writable {
2119     private HRegionInfo region;
2120 
2121     public enum State {
2122       OFFLINE,        // region is in an offline state
2123       PENDING_OPEN,   // sent rpc to server to open but has not begun
2124       OPENING,        // server has begun to open but not yet done
2125       OPEN,           // server opened region and updated meta
2126       PENDING_CLOSE,  // sent rpc to server to close but has not begun
2127       CLOSING,        // server has begun to close but not yet done
2128       CLOSED          // server closed region and updated meta
2129     }
2130 
2131     private State state;
2132     private long stamp;
2133 
2134     public RegionState() {}
2135 
2136     RegionState(HRegionInfo region, State state) {
2137       this(region, state, System.currentTimeMillis());
2138     }
2139 
2140     RegionState(HRegionInfo region, State state, long stamp) {
2141       this.region = region;
2142       this.state = state;
2143       this.stamp = stamp;
2144     }
2145 
2146     public void update(State state, long stamp) {
2147       this.state = state;
2148       this.stamp = stamp;
2149     }
2150 
2151     public void update(State state) {
2152       this.state = state;
2153       this.stamp = System.currentTimeMillis();
2154     }
2155 
2156     public State getState() {
2157       return state;
2158     }
2159 
2160     public long getStamp() {
2161       return stamp;
2162     }
2163 
2164     public HRegionInfo getRegion() {
2165       return region;
2166     }
2167 
2168     public boolean isClosing() {
2169       return state == State.CLOSING;
2170     }
2171 
2172     public boolean isClosed() {
2173       return state == State.CLOSED;
2174     }
2175 
2176     public boolean isPendingClose() {
2177       return state == State.PENDING_CLOSE;
2178     }
2179 
2180     public boolean isOpening() {
2181       return state == State.OPENING;
2182     }
2183 
2184     public boolean isOpened() {
2185       return state == State.OPEN;
2186     }
2187 
2188     public boolean isPendingOpen() {
2189       return state == State.PENDING_OPEN;
2190     }
2191 
2192     public boolean isOffline() {
2193       return state == State.OFFLINE;
2194     }
2195 
2196     @Override
2197     public String toString() {
2198       return region.getRegionNameAsString() + " state=" + state +
2199         ", ts=" + stamp;
2200     }
2201 
2202     @Override
2203     public void readFields(DataInput in) throws IOException {
2204       region = new HRegionInfo();
2205       region.readFields(in);
2206       state = State.valueOf(in.readUTF());
2207       stamp = in.readLong();
2208     }
2209 
2210     @Override
2211     public void write(DataOutput out) throws IOException {
2212       region.write(out);
2213       out.writeUTF(state.name());
2214       out.writeLong(stamp);
2215     }
2216   }
2217 
2218   public void stop() {
2219     this.timeoutMonitor.interrupt();
2220   }
2221   
2222   /**
2223    * Check whether the RegionServer is online.
2224    */
2225   public boolean isServerOnline(String serverName) {
2226     return this.serverManager.isServerOnline(serverName);
2227   }
2228 }