View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.lang.Thread.UncaughtExceptionHandler;
24  import java.lang.annotation.Retention;
25  import java.lang.annotation.RetentionPolicy;
26  import java.lang.management.ManagementFactory;
27  import java.lang.management.MemoryUsage;
28  import java.lang.reflect.Constructor;
29  import java.lang.reflect.Method;
30  import java.net.BindException;
31  import java.net.InetSocketAddress;
32  import java.util.ArrayList;
33  import java.util.Collection;
34  import java.util.Collections;
35  import java.util.Comparator;
36  import java.util.HashMap;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.HashSet;
39  import java.util.Iterator;
40  import java.util.LinkedList;
41  import java.util.List;
42  import java.util.Map;
43  import java.util.Random;
44  import java.util.Set;
45  import java.util.SortedMap;
46  import java.util.TreeMap;
47  import java.util.concurrent.ConcurrentSkipListSet;
48  import java.util.concurrent.LinkedBlockingQueue;
49  import java.util.concurrent.TimeUnit;
50  import java.util.concurrent.atomic.AtomicBoolean;
51  import java.util.concurrent.atomic.AtomicInteger;
52  import java.util.concurrent.locks.ReentrantReadWriteLock;
53  
54  import com.google.common.collect.Lists;
55  import org.apache.commons.logging.Log;
56  import org.apache.commons.logging.LogFactory;
57  import org.apache.hadoop.conf.Configuration;
58  import org.apache.hadoop.fs.FileSystem;
59  import org.apache.hadoop.fs.Path;
60  import org.apache.hadoop.hbase.Chore;
61  import org.apache.hadoop.hbase.ClockOutOfSyncException;
62  import org.apache.hadoop.hbase.DoNotRetryIOException;
63  import org.apache.hadoop.hbase.HBaseConfiguration;
64  import org.apache.hadoop.hbase.HConstants;
65  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
66  import org.apache.hadoop.hbase.HMsg;
67  import org.apache.hadoop.hbase.HRegionInfo;
68  import org.apache.hadoop.hbase.HServerAddress;
69  import org.apache.hadoop.hbase.HServerInfo;
70  import org.apache.hadoop.hbase.HServerLoad;
71  import org.apache.hadoop.hbase.KeyValue;
72  import org.apache.hadoop.hbase.MasterAddressTracker;
73  import org.apache.hadoop.hbase.NotServingRegionException;
74  import org.apache.hadoop.hbase.RemoteExceptionHandler;
75  import org.apache.hadoop.hbase.Server;
76  import org.apache.hadoop.hbase.Stoppable;
77  import org.apache.hadoop.hbase.UnknownRowLockException;
78  import org.apache.hadoop.hbase.UnknownScannerException;
79  import org.apache.hadoop.hbase.YouAreDeadException;
80  import org.apache.hadoop.hbase.catalog.CatalogTracker;
81  import org.apache.hadoop.hbase.catalog.MetaEditor;
82  import org.apache.hadoop.hbase.catalog.RootLocationEditor;
83  import org.apache.hadoop.hbase.client.Action;
84  import org.apache.hadoop.hbase.client.Delete;
85  import org.apache.hadoop.hbase.client.Get;
86  import org.apache.hadoop.hbase.client.HConnection;
87  import org.apache.hadoop.hbase.client.HConnectionManager;
88  import org.apache.hadoop.hbase.client.Increment;
89  import org.apache.hadoop.hbase.client.MultiAction;
90  import org.apache.hadoop.hbase.client.MultiPut;
91  import org.apache.hadoop.hbase.client.MultiPutResponse;
92  import org.apache.hadoop.hbase.client.MultiResponse;
93  import org.apache.hadoop.hbase.client.Put;
94  import org.apache.hadoop.hbase.client.Result;
95  import org.apache.hadoop.hbase.client.Row;
96  import org.apache.hadoop.hbase.client.Scan;
97  import org.apache.hadoop.hbase.executor.ExecutorService;
98  import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
99  import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
100 import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
101 import org.apache.hadoop.hbase.ipc.HBaseRPC;
102 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
103 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
104 import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
105 import org.apache.hadoop.hbase.ipc.HBaseServer;
106 import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
107 import org.apache.hadoop.hbase.ipc.HRegionInterface;
108 import org.apache.hadoop.hbase.ipc.ServerNotRunningException;
109 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
110 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
111 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
112 import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
113 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
114 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
115 import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
116 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
117 import org.apache.hadoop.hbase.regionserver.wal.HLog;
118 import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
119 import org.apache.hadoop.hbase.replication.regionserver.Replication;
120 import org.apache.hadoop.hbase.security.User;
121 import org.apache.hadoop.hbase.util.Bytes;
122 import org.apache.hadoop.hbase.util.CompressionTest;
123 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
124 import org.apache.hadoop.hbase.util.FSUtils;
125 import org.apache.hadoop.hbase.util.InfoServer;
126 import org.apache.hadoop.hbase.util.Pair;
127 import org.apache.hadoop.hbase.util.Sleeper;
128 import org.apache.hadoop.hbase.util.Strings;
129 import org.apache.hadoop.hbase.util.Threads;
130 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
131 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
132 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
133 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
134 import org.apache.hadoop.io.MapWritable;
135 import org.apache.hadoop.io.Writable;
136 import org.apache.hadoop.ipc.RemoteException;
137 import org.apache.hadoop.net.DNS;
138 import org.apache.zookeeper.KeeperException;
139 
140 import com.google.common.base.Function;
141 
142 /**
143  * HRegionServer makes a set of HRegions available to clients. It checks in with
144  * the HMaster. There are many HRegionServers in a single HBase deployment.
145  */
146 public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
147     Runnable, RegionServerServices, Server {
148   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
149 
150   // Set when a report to the master comes back with a message asking us to
151   // shutdown. Also set by call to stop when debugging or running unit tests
152   // of HRegionServer in isolation.
153   protected volatile boolean stopped = false;
154 
155   // A state before we go into stopped state.  At this stage we're closing user
156   // space regions.
157   private boolean stopping = false;
158 
159   // Go down hard. Used if file system becomes unavailable and also in
160   // debugging and unit tests.
161   protected volatile boolean abortRequested;
162 
163   private volatile boolean killed = false;
164 
165   // If false, the file system has become unavailable
166   protected volatile boolean fsOk;
167 
168   protected HServerInfo serverInfo;
169   protected final Configuration conf;
170 
171   private final HConnection connection;
172   protected final AtomicBoolean haveRootRegion = new AtomicBoolean(false);
173   private FileSystem fs;
174   private Path rootDir;
175   private final Random rand = new Random();
176 
177   /**
178    * Map of regions currently being served by this region server. Key is the
179    * encoded region name.  All access should be synchronized.
180    */
181   protected final Map<String, HRegion> onlineRegions =
182     new ConcurrentHashMap<String, HRegion>();
183 
184   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
185   private final LinkedBlockingQueue<HMsg> outboundMsgs = new LinkedBlockingQueue<HMsg>();
186 
187   final int numRetries;
188   protected final int threadWakeFrequency;
189   private final int msgInterval;
190 
191   protected final int numRegionsToReport;
192 
193   private final long maxScannerResultSize;
194 
195   // Remote HMaster
196   private HMasterRegionInterface hbaseMaster;
197 
198   // Server to handle client requests. Default access so can be accessed by
199   // unit tests.
200   HBaseServer server;
201 
202   // Leases
203   private Leases leases;
204 
205   // Request counter
206   private volatile AtomicInteger requestCount = new AtomicInteger();
207 
208   // Info server. Default access so can be used by unit tests. REGIONSERVER
209   // is name of the webapp and the attribute name used stuffing this instance
210   // into web context.
211   InfoServer infoServer;
212 
213   /** region server process name */
214   public static final String REGIONSERVER = "regionserver";
215 
216   /*
217    * Space is reserved in HRS constructor and then released when aborting to
218    * recover from an OOME. See HBASE-706. TODO: Make this percentage of the heap
219    * or a minimum.
220    */
221   private final LinkedList<byte[]> reservedSpace = new LinkedList<byte[]>();
222 
223   private RegionServerMetrics metrics;
224 
225   // Compactions
226   CompactSplitThread compactSplitThread;
227 
228   // Cache flushing
229   MemStoreFlusher cacheFlusher;
230 
231   /*
232    * Check for major compactions.
233    */
234   Chore majorCompactionChecker;
235 
236   // HLog and HLog roller. log is protected rather than private to avoid
237   // eclipse warning when accessed by inner classes
238   protected volatile HLog hlog;
239   LogRoller hlogRoller;
240 
241   // flag set after we're done setting up server threads (used for testing)
242   protected volatile boolean isOnline;
243 
244   final Map<String, InternalScanner> scanners = new ConcurrentHashMap<String, InternalScanner>();
245 
246   // zookeeper connection and watcher
247   private ZooKeeperWatcher zooKeeper;
248 
249   // master address manager and watcher
250   private MasterAddressTracker masterAddressManager;
251 
252   // catalog tracker
253   private CatalogTracker catalogTracker;
254 
255   // Cluster Status Tracker
256   private ClusterStatusTracker clusterStatusTracker;
257 
258   // A sleeper that sleeps for msgInterval.
259   private final Sleeper sleeper;
260 
261   private final int rpcTimeout;
262 
263   // The main region server thread.
264   @SuppressWarnings("unused")
265   private Thread regionServerThread;
266 
267   // Instance of the hbase executor service.
268   private ExecutorService service;
269 
270   // Replication services. If no replication, this handler will be null.
271   private Replication replicationHandler;
272 
273   private final Set<byte[]> regionsInTransitionInRS =
274       new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
275 
276   /**
277    * Starts a HRegionServer at the default location
278    *
279    * @param conf
280    * @throws IOException
281    * @throws InterruptedException
282    */
283   public HRegionServer(Configuration conf) throws IOException, InterruptedException {
284     this.fsOk = true;
285     this.conf = conf;
286     this.connection = HConnectionManager.getConnection(conf);
287     this.isOnline = false;
288 
289     // check to see if the codec list is available:
290     String [] codecs = conf.getStrings("hbase.regionserver.codecs",
291         (String[])null);
292     if (codecs != null) {
293       for (String codec : codecs) {
294         if (!CompressionTest.testCompression(codec)) {
295           throw new IOException("Compression codec " + codec +
296               " not supported, aborting RS construction");
297         }
298       }
299     }
300 
301     // Config'ed params
302     this.numRetries = conf.getInt("hbase.client.retries.number", 10);
303     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
304         10 * 1000);
305     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
306 
307     sleeper = new Sleeper(this.msgInterval, this);
308 
309     this.maxScannerResultSize = conf.getLong(
310         HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
311         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
312 
313     this.numRegionsToReport = conf.getInt(
314         "hbase.regionserver.numregionstoreport", 10);
315 
316     this.rpcTimeout = conf.getInt(
317         HConstants.HBASE_RPC_TIMEOUT_KEY,
318         HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
319 
320     this.abortRequested = false;
321     this.stopped = false;
322 
323     // Server to handle client requests
324     String machineName = Strings.domainNamePointerToHostName(
325         DNS.getDefaultHost(conf.get("hbase.regionserver.dns.interface", "default")
326             , conf.get("hbase.regionserver.dns.nameserver", "default")));
327     String addressStr = machineName + ":" +
328       conf.get(HConstants.REGIONSERVER_PORT,
329         Integer.toString(HConstants.DEFAULT_REGIONSERVER_PORT));
330     HServerAddress address = new HServerAddress(addressStr);
331     this.server = HBaseRPC.getServer(this,
332         new Class<?>[]{HRegionInterface.class, HBaseRPCErrorHandler.class,
333         OnlineRegions.class},
334         address.getBindAddress(),
335       address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
336         conf.getInt("hbase.regionserver.metahandler.count", 10),
337         false, conf, QOS_THRESHOLD);
338     this.server.setErrorHandler(this);
339     this.server.setQosFunction(new QosFunction());
340 
341     // HServerInfo can be amended by master.  See below in reportForDuty.
342     this.serverInfo = new HServerInfo(new HServerAddress(new InetSocketAddress(
343         address.getBindAddress(), this.server.getListenerAddress().getPort())),
344         System.currentTimeMillis(), this.conf.getInt(
345             "hbase.regionserver.info.port", 60030), machineName);
346     if (this.serverInfo.getServerAddress() == null) {
347       throw new NullPointerException("Server address cannot be null; "
348           + "hbase-958 debugging");
349     }
350 
351     // login the server principal (if using secure Hadoop)
352     User.login(conf, "hbase.regionserver.keytab.file",
353         "hbase.regionserver.kerberos.principal", serverInfo.getHostname());
354   }
355 
356   private static final int NORMAL_QOS = 0;
357   private static final int QOS_THRESHOLD = 10;  // the line between low and high qos
358   private static final int HIGH_QOS = 100;
359 
360   @Retention(RetentionPolicy.RUNTIME)
361   private @interface QosPriority {
362     int priority() default 0;
363   }
364 
365   class QosFunction implements Function<Writable,Integer> {
366     private final Map<String, Integer> annotatedQos;
367 
368     public QosFunction() {
369       Map<String, Integer> qosMap = new HashMap<String, Integer>();
370       for (Method m : HRegionServer.class.getMethods()) {
371         QosPriority p = m.getAnnotation(QosPriority.class);
372         if (p != null) {
373           qosMap.put(m.getName(), p.priority());
374         }
375       }
376       
377       annotatedQos = qosMap;
378     }
379 
380     public boolean isMetaRegion(byte[] regionName) {
381       HRegion region;
382       try {
383         region = getRegion(regionName);
384       } catch (NotServingRegionException ignored) {
385         return false;
386       }
387       return region.getRegionInfo().isMetaRegion();
388     }
389 
390     @Override
391     public Integer apply(Writable from) {
392       if (!(from instanceof HBaseRPC.Invocation)) return NORMAL_QOS;
393 
394       HBaseRPC.Invocation inv = (HBaseRPC.Invocation) from;
395       String methodName = inv.getMethodName();
396       
397       Integer priorityByAnnotation = annotatedQos.get(methodName);
398       if (priorityByAnnotation != null) {
399         return priorityByAnnotation;
400       }
401 
402       // scanner methods...
403       if (methodName.equals("next") || methodName.equals("close")) {
404         // translate!
405         Long scannerId;
406         try {
407           scannerId = (Long) inv.getParameters()[0];
408         } catch (ClassCastException ignored) {
409           // LOG.debug("Low priority: " + from);
410           return NORMAL_QOS; // doh.
411         }
412         String scannerIdString = Long.toString(scannerId);
413         InternalScanner scanner = scanners.get(scannerIdString);
414         if (scanner instanceof HRegion.RegionScanner) {
415           HRegion.RegionScanner rs = (HRegion.RegionScanner) scanner;
416           HRegionInfo regionName = rs.getRegionName();
417           if (regionName.isMetaRegion()) {
418             // LOG.debug("High priority scanner request: " + scannerId);
419             return HIGH_QOS;
420           }
421         }
422       } else if (inv.getParameterClasses().length == 0) {
423        // Just let it through.  This is getOnlineRegions, etc.
424       } else if (inv.getParameterClasses()[0] == byte[].class) {
425         // first arg is byte array, so assume this is a regionname:
426         if (isMetaRegion((byte[]) inv.getParameters()[0])) {
427           // LOG.debug("High priority with method: " + methodName +
428           // " and region: "
429           // + Bytes.toString((byte[]) inv.getParameters()[0]));
430           return HIGH_QOS;
431         }
432       } else if (inv.getParameterClasses()[0] == MultiAction.class) {
433         MultiAction ma = (MultiAction) inv.getParameters()[0];
434         Set<byte[]> regions = ma.getRegions();
435         // ok this sucks, but if any single of the actions touches a meta, the
436         // whole
437         // thing gets pingged high priority. This is a dangerous hack because
438         // people
439         // can get their multi action tagged high QOS by tossing a Get(.META.)
440         // AND this
441         // regionserver hosts META/-ROOT-
442         for (byte[] region : regions) {
443           if (isMetaRegion(region)) {
444             // LOG.debug("High priority multi with region: " +
445             // Bytes.toString(region));
446             return HIGH_QOS; // short circuit for the win.
447           }
448         }
449       }
450       // LOG.debug("Low priority: " + from.toString());
451       return NORMAL_QOS;
452     }
453   }
454 
455   /**
456    * Creates all of the state that needs to be reconstructed in case we are
457    * doing a restart. This is shared between the constructor and restart(). Both
458    * call it.
459    *
460    * @throws IOException
461    * @throws InterruptedException
462    */
463   private void initialize() throws IOException, InterruptedException {
464     try {
465       initializeZooKeeper();
466       initializeThreads();
467       int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4);
468       for (int i = 0; i < nbBlocks; i++) {
469         reservedSpace.add(new byte[HConstants.DEFAULT_SIZE_RESERVATION_BLOCK]);
470       }
471     } catch (Throwable t) {
472       // Call stop if error or process will stick around for ever since server
473       // puts up non-daemon threads.
474       LOG.error("Stopping HRS because failed initialize", t);
475       this.server.stop();
476     }
477   }
478 
479   /**
480    * Bring up connection to zk ensemble and then wait until a master for this
481    * cluster and then after that, wait until cluster 'up' flag has been set.
482    * This is the order in which master does things.
483    * Finally put up a catalog tracker.
484    * @throws IOException
485    * @throws InterruptedException
486    */
487   private void initializeZooKeeper() throws IOException, InterruptedException {
488     // Open connection to zookeeper and set primary watcher
489     zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + ":" +
490       serverInfo.getServerAddress().getPort(), this);
491 
492     // Create the master address manager, register with zk, and start it.  Then
493     // block until a master is available.  No point in starting up if no master
494     // running.
495     this.masterAddressManager = new MasterAddressTracker(this.zooKeeper, this);
496     this.masterAddressManager.start();
497     blockAndCheckIfStopped(this.masterAddressManager);
498 
499     // Wait on cluster being up.  Master will set this flag up in zookeeper
500     // when ready.
501     this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this);
502     this.clusterStatusTracker.start();
503     blockAndCheckIfStopped(this.clusterStatusTracker);
504 
505     // Create the catalog tracker and start it;
506     this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
507       this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE));
508     catalogTracker.start();
509   }
510 
511   /**
512    * Utilty method to wait indefinitely on a znode availability while checking
513    * if the region server is shut down
514    * @param tracker znode tracker to use
515    * @throws IOException any IO exception, plus if the RS is stopped
516    * @throws InterruptedException
517    */
518   private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
519       throws IOException, InterruptedException {
520     while (tracker.blockUntilAvailable(this.msgInterval) == null) {
521       if (this.stopped) {
522         throw new IOException("Received the shutdown message while waiting.");
523       }
524     }
525   }
526 
527   /**
528    * @return False if cluster shutdown in progress
529    */
530   private boolean isClusterUp() {
531     return this.clusterStatusTracker.isClusterUp();
532   }
533 
534   private void initializeThreads() throws IOException {
535 
536     // Cache flushing thread.
537     this.cacheFlusher = new MemStoreFlusher(conf, this);
538 
539     // Compaction thread
540     this.compactSplitThread = new CompactSplitThread(this);
541 
542     // Background thread to check for major compactions; needed if region
543     // has not gotten updates in a while. Make it run at a lesser frequency.
544     int multiplier = this.conf.getInt(HConstants.THREAD_WAKE_FREQUENCY
545         + ".multiplier", 1000);
546     this.majorCompactionChecker = new MajorCompactionChecker(this,
547         this.threadWakeFrequency * multiplier, this);
548 
549     this.leases = new Leases((int) conf.getLong(
550         HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
551         HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD),
552         this.threadWakeFrequency);
553   }
554 
555   /**
556    * The HRegionServer sticks in this loop until closed. It repeatedly checks in
557    * with the HMaster, sending heartbeats & reports, and receiving HRegion
558    * load/unload instructions.
559    */
560   public void run() {
561 
562     try {
563       // Initialize threads and wait for a master
564       initialize();
565     } catch (Exception e) {
566       abort("Fatal exception during initialization", e);
567     }
568 
569     this.regionServerThread = Thread.currentThread();
570     try {
571       while (!this.stopped) {
572         if (tryReportForDuty()) break;
573       }
574       long lastMsg = 0;
575       List<HMsg> outboundMessages = new ArrayList<HMsg>();
576       // The main run loop.
577       for (int tries = 0; !this.stopped && isHealthy();) {
578         if (!isClusterUp()) {
579           if (isOnlineRegionsEmpty()) {
580             stop("Exiting; cluster shutdown set and not carrying any regions");
581           } else if (!this.stopping) {
582             this.stopping = true;
583             closeUserRegions(this.abortRequested);
584           } else if (this.stopping && LOG.isDebugEnabled()) {
585             LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
586           }
587         }
588         long now = System.currentTimeMillis();
589         // Drop into the send loop if msgInterval has elapsed or if something
590         // to send. If we fail talking to the master, then we'll sleep below
591         // on poll of the outboundMsgs blockingqueue.
592         if ((now - lastMsg) >= msgInterval || !outboundMessages.isEmpty()) {
593           try {
594             doMetrics();
595             tryRegionServerReport(outboundMessages);
596             lastMsg = System.currentTimeMillis();
597             // Reset tries count if we had a successful transaction.
598             tries = 0;
599             if (this.stopped) continue;
600           } catch (Exception e) { // FindBugs REC_CATCH_EXCEPTION
601             // Two special exceptions could be printed out here,
602             // PleaseHoldException and YouAreDeadException
603             if (e instanceof IOException) {
604               e = RemoteExceptionHandler.checkIOException((IOException) e);
605             }
606             if (e instanceof YouAreDeadException) {
607               // This will be caught and handled as a fatal error below
608               throw e;
609             }
610             tries++;
611             if (tries > 0 && (tries % this.numRetries) == 0) {
612               // Check filesystem every so often.
613               checkFileSystem();
614             }
615             if (this.stopped) {
616               continue;
617             }
618             LOG.warn("Attempt=" + tries, e);
619             // No point retrying immediately; this is probably connection to
620             // master issue. Doing below will cause us to sleep.
621             lastMsg = System.currentTimeMillis();
622           }
623         }
624         now = System.currentTimeMillis();
625         HMsg msg = this.outboundMsgs.poll((msgInterval - (now - lastMsg)), TimeUnit.MILLISECONDS);
626         if (msg != null) outboundMessages.add(msg);
627       } // for
628     } catch (Throwable t) {
629       if (!checkOOME(t)) {
630         abort("Unhandled exception: " + t.getMessage(), t);
631       }
632     }
633     this.leases.closeAfterLeasesExpire();
634     this.server.stop();
635     if (this.infoServer != null) {
636       LOG.info("Stopping infoServer");
637       try {
638         this.infoServer.stop();
639       } catch (Exception e) {
640         e.printStackTrace();
641       }
642     }
643     // Send cache a shutdown.
644     LruBlockCache c = (LruBlockCache) StoreFile.getBlockCache(this.conf);
645     if (c != null) {
646       c.shutdown();
647     }
648 
649     // Send interrupts to wake up threads if sleeping so they notice shutdown.
650     // TODO: Should we check they are alive? If OOME could have exited already
651     if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
652     if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
653     if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary();
654     if (this.majorCompactionChecker != null) this.majorCompactionChecker.interrupt();
655 
656     if (this.killed) {
657       // Just skip out w/o closing regions.
658     } else if (abortRequested) {
659       if (this.fsOk) {
660         closeAllRegions(abortRequested); // Don't leave any open file handles
661         closeWAL(false);
662       }
663       LOG.info("aborting server at: " + this.serverInfo.getServerName());
664     } else {
665       closeAllRegions(abortRequested);
666       closeWAL(true);
667       closeAllScanners();
668       LOG.info("stopping server at: " + this.serverInfo.getServerName());
669     }
670     // Interrupt catalog tracker here in case any regions being opened out in
671     // handlers are stuck waiting on meta or root.
672     if (this.catalogTracker != null) this.catalogTracker.stop();
673     if (this.fsOk) waitOnAllRegionsToClose();
674 
675     // Make sure the proxy is down.
676     if (this.hbaseMaster != null) {
677       HBaseRPC.stopProxy(this.hbaseMaster);
678       this.hbaseMaster = null;
679     }
680     this.leases.close();
681     HConnectionManager.deleteConnection(conf, true);
682     this.zooKeeper.close();
683     if (!killed) {
684       join();
685     }
686     LOG.info(Thread.currentThread().getName() + " exiting");
687   }
688 
689   String getOnlineRegionsAsPrintableString() {
690     StringBuilder sb = new StringBuilder();
691     for (HRegion r: this.onlineRegions.values()) {
692       if (sb.length() > 0) sb.append(", ");
693       sb.append(r.getRegionInfo().getEncodedName());
694     }
695     return sb.toString();
696   }
697 
698   /**
699    * Wait on regions close.
700    */
701   private void waitOnAllRegionsToClose() {
702     // Wait till all regions are closed before going out.
703     int lastCount = -1;
704     while (!isOnlineRegionsEmpty()) {
705       int count = getNumberOfOnlineRegions();
706       // Only print a message if the count of regions has changed.
707       if (count != lastCount) {
708         lastCount = count;
709         LOG.info("Waiting on " + count + " regions to close");
710         // Only print out regions still closing if a small number else will
711         // swamp the log.
712         if (count < 10 && LOG.isDebugEnabled()) {
713           LOG.debug(this.onlineRegions);
714         }
715       }
716       Threads.sleep(1000);
717     }
718   }
719 
720   List<HMsg> tryRegionServerReport(final List<HMsg> outboundMessages)
721   throws IOException {
722     this.serverInfo.setLoad(buildServerLoad());
723     this.requestCount.set(0);
724     addOutboundMsgs(outboundMessages);
725     HMsg [] msgs = null;
726     while (!this.stopped) {
727       try {
728         msgs = this.hbaseMaster.regionServerReport(this.serverInfo,
729           outboundMessages.toArray(HMsg.EMPTY_HMSG_ARRAY),
730           getMostLoadedRegions());
731         break;
732       } catch (IOException ioe) {
733         if (ioe instanceof RemoteException) {
734           ioe = ((RemoteException)ioe).unwrapRemoteException();
735         }
736         if (ioe instanceof YouAreDeadException) {
737           // This will be caught and handled as a fatal error in run()
738           throw ioe;
739         }
740         LOG.warn("RemoteException connecting to master", ioe);
741         // Couldn't connect to the master, get location from zk and reconnect
742         // Method blocks until new master is found or we are stopped
743         getMaster();
744       }
745     }
746     updateOutboundMsgs(outboundMessages);
747     outboundMessages.clear();
748 
749     for (int i = 0; !this.stopped && msgs != null && i < msgs.length; i++) {
750       LOG.info(msgs[i].toString());
751       // Intercept stop regionserver messages
752       if (msgs[i].getType().equals(HMsg.Type.STOP_REGIONSERVER)) {
753         stop("Received " + msgs[i]);
754         continue;
755       }
756       LOG.warn("NOT PROCESSING " + msgs[i] + " -- WHY IS MASTER SENDING IT TO US?");
757     }
758     return outboundMessages;
759   }
760 
761   private HServerLoad buildServerLoad() {
762     MemoryUsage memory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
763     HServerLoad hsl = new HServerLoad(requestCount.get(),
764       (int)(memory.getUsed() / 1024 / 1024),
765       (int) (memory.getMax() / 1024 / 1024));
766     for (HRegion r : this.onlineRegions.values()) {
767       hsl.addRegionInfo(createRegionLoad(r));
768     }
769     return hsl;
770   }
771 
772   private void closeWAL(final boolean delete) {
773     try {
774       if (this.hlog != null) {
775         if (delete) {
776           hlog.closeAndDelete();
777         } else {
778           hlog.close();
779         }
780       }
781     } catch (Throwable e) {
782       LOG.error("Close and delete failed", RemoteExceptionHandler.checkThrowable(e));
783     }
784   }
785 
786   private void closeAllScanners() {
787     // Close any outstanding scanners. Means they'll get an UnknownScanner
788     // exception next time they come in.
789     for (Map.Entry<String, InternalScanner> e : this.scanners.entrySet()) {
790       try {
791         e.getValue().close();
792       } catch (IOException ioe) {
793         LOG.warn("Closing scanner " + e.getKey(), ioe);
794       }
795     }
796   }
797 
798   /*
799    * Add to the passed <code>msgs</code> messages to pass to the master.
800    *
801    * @param msgs Current outboundMsgs array; we'll add messages to this List.
802    */
803   private void addOutboundMsgs(final List<HMsg> msgs) {
804     if (msgs.isEmpty()) {
805       this.outboundMsgs.drainTo(msgs);
806       return;
807     }
808     OUTER: for (HMsg m : this.outboundMsgs) {
809       for (HMsg mm : msgs) {
810         // Be careful don't add duplicates.
811         if (mm.equals(m)) {
812           continue OUTER;
813         }
814       }
815       msgs.add(m);
816     }
817   }
818 
819   /*
820    * Remove from this.outboundMsgs those messsages we sent the master.
821    *
822    * @param msgs Messages we sent the master.
823    */
824   private void updateOutboundMsgs(final List<HMsg> msgs) {
825     if (msgs.isEmpty()) {
826       return;
827     }
828     for (HMsg m : this.outboundMsgs) {
829       for (HMsg mm : msgs) {
830         if (mm.equals(m)) {
831           this.outboundMsgs.remove(m);
832           break;
833         }
834       }
835     }
836   }
837 
838   /*
839    * Run init. Sets up hlog and starts up all server threads.
840    *
841    * @param c Extra configuration.
842    */
843   protected void handleReportForDutyResponse(final MapWritable c) throws IOException {
844     try {
845       for (Map.Entry<Writable, Writable> e : c.entrySet()) {
846 
847         String key = e.getKey().toString();
848         // Use the address the master passed us
849         if (key.equals("hbase.regionserver.address")) {
850           HServerAddress hsa = (HServerAddress) e.getValue();
851           LOG.info("Master passed us address to use. Was="
852             + this.serverInfo.getServerAddress() + ", Now=" + hsa.toString());
853           this.serverInfo.setServerAddress(hsa);
854           continue;
855         }
856         String value = e.getValue().toString();
857         if (LOG.isDebugEnabled()) {
858           LOG.debug("Config from master: " + key + "=" + value);
859         }
860         this.conf.set(key, value);
861       }
862       // hack! Maps DFSClient => RegionServer for logs.  HDFS made this
863       // config param for task trackers, but we can piggyback off of it.
864       if (this.conf.get("mapred.task.id") == null) {
865         this.conf.set("mapred.task.id",
866             "hb_rs_" + this.serverInfo.getServerName() + "_" +
867             System.currentTimeMillis());
868       }
869 
870       // Master sent us hbase.rootdir to use. Should be fully qualified
871       // path with file system specification included. Set 'fs.defaultFS'
872       // to match the filesystem on hbase.rootdir else underlying hadoop hdfs
873       // accessors will be going against wrong filesystem (unless all is set
874       // to defaults).
875       this.conf.set("fs.defaultFS", this.conf.get("hbase.rootdir"));
876       // Get fs instance used by this RS
877       this.fs = FileSystem.get(this.conf);
878       this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
879       this.hlog = setupWALAndReplication();
880       // Init in here rather than in constructor after thread name has been set
881       this.metrics = new RegionServerMetrics();
882       startServiceThreads();
883       LOG.info("Serving as " + this.serverInfo.getServerName() +
884         ", RPC listening on " + this.server.getListenerAddress() +
885         ", sessionid=0x" +
886         Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
887       isOnline = true;
888     } catch (Throwable e) {
889       this.isOnline = false;
890       stop("Failed initialization");
891       throw convertThrowableToIOE(cleanup(e, "Failed init"),
892           "Region server startup failed");
893     }
894   }
895 
896   /*
897    * @param r Region to get RegionLoad for.
898    *
899    * @return RegionLoad instance.
900    *
901    * @throws IOException
902    */
903   private HServerLoad.RegionLoad createRegionLoad(final HRegion r) {
904     byte[] name = r.getRegionName();
905     int stores = 0;
906     int storefiles = 0;
907     int storefileSizeMB = 0;
908     int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
909     int storefileIndexSizeMB = 0;
910     synchronized (r.stores) {
911       stores += r.stores.size();
912       for (Store store : r.stores.values()) {
913         storefiles += store.getStorefilesCount();
914         storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
915         storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
916       }
917     }
918     return new HServerLoad.RegionLoad(name, stores, storefiles,
919         storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB);
920   }
921 
922   /**
923    * @param encodedRegionName
924    * @return An instance of RegionLoad.
925    * @throws IOException
926    */
927   public HServerLoad.RegionLoad createRegionLoad(final String encodedRegionName) {
928     HRegion r = null;
929     r = this.onlineRegions.get(encodedRegionName);
930     return r != null ? createRegionLoad(r) : null;
931   }
932 
933   /*
934    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
935    * IOE if it isn't already.
936    *
937    * @param t Throwable
938    *
939    * @return Throwable converted to an IOE; methods can only let out IOEs.
940    */
941   private Throwable cleanup(final Throwable t) {
942     return cleanup(t, null);
943   }
944 
945   /*
946    * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
947    * IOE if it isn't already.
948    *
949    * @param t Throwable
950    *
951    * @param msg Message to log in error. Can be null.
952    *
953    * @return Throwable converted to an IOE; methods can only let out IOEs.
954    */
955   private Throwable cleanup(final Throwable t, final String msg) {
956     // Don't log as error if NSRE; NSRE is 'normal' operation.
957     if (t instanceof NotServingRegionException) {
958       LOG.debug("NotServingRegionException; " +  t.getMessage());
959       return t;
960     }
961     if (msg == null) {
962       LOG.error("", RemoteExceptionHandler.checkThrowable(t));
963     } else {
964       LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
965     }
966     if (!checkOOME(t)) {
967       checkFileSystem();
968     }
969     return t;
970   }
971 
972   /*
973    * @param t
974    *
975    * @return Make <code>t</code> an IOE if it isn't already.
976    */
977   private IOException convertThrowableToIOE(final Throwable t) {
978     return convertThrowableToIOE(t, null);
979   }
980 
981   /*
982    * @param t
983    *
984    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
985    *
986    * @return Make <code>t</code> an IOE if it isn't already.
987    */
988   private IOException convertThrowableToIOE(final Throwable t, final String msg) {
989     return (t instanceof IOException ? (IOException) t : msg == null
990         || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
991   }
992 
993   /*
994    * Check if an OOME and if so, call abort.
995    *
996    * @param e
997    *
998    * @return True if we OOME'd and are aborting.
999    */
1000   public boolean checkOOME(final Throwable e) {
1001     boolean stop = false;
1002     if (e instanceof OutOfMemoryError
1003         || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1004         || (e.getMessage() != null && e.getMessage().contains(
1005             "java.lang.OutOfMemoryError"))) {
1006       abort("OutOfMemoryError, aborting", e);
1007       stop = true;
1008     }
1009     return stop;
1010   }
1011 
1012   /**
1013    * Checks to see if the file system is still accessible. If not, sets
1014    * abortRequested and stopRequested
1015    *
1016    * @return false if file system is not available
1017    */
1018   protected boolean checkFileSystem() {
1019     if (this.fsOk && this.fs != null) {
1020       try {
1021         FSUtils.checkFileSystemAvailable(this.fs);
1022       } catch (IOException e) {
1023         abort("File System not available", e);
1024         this.fsOk = false;
1025       }
1026     }
1027     return this.fsOk;
1028   }
1029 
1030   /*
1031    * Inner class that runs on a long period checking if regions need major
1032    * compaction.
1033    */
1034   private static class MajorCompactionChecker extends Chore {
1035     private final HRegionServer instance;
1036     private int majorCompactPriority;
1037     private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1038 
1039 
1040     MajorCompactionChecker(final HRegionServer h, final int sleepTime,
1041         final Stoppable stopper) {
1042       super("MajorCompactionChecker", sleepTime, h);
1043       this.instance = h;
1044       LOG.info("Runs every " + sleepTime + "ms");
1045 
1046       /*
1047        * MajorCompactPriority is configurable.
1048        * If not set, the compaction will use default priority.
1049        */
1050       majorCompactPriority = this.instance.conf.getInt(
1051         "hbase.regionserver.compactionChecker.majorCompactPriority",
1052         DEFAULT_PRIORITY);
1053     }
1054 
1055     @Override
1056     protected void chore() {
1057       for (HRegion r : this.instance.onlineRegions.values()) {
1058         try {
1059           if (r != null && r.isMajorCompaction()) {
1060             // Queue a compaction. Will recognize if major is needed.
1061         	if(majorCompactPriority == DEFAULT_PRIORITY || 
1062         	    majorCompactPriority > r.getCompactPriority()){
1063               this.instance.compactSplitThread.requestCompaction(r, getName()
1064                 + " requests major compaction use default priority");
1065         	} else {
1066         	  this.instance.compactSplitThread.requestCompaction(r, getName()
1067                 + " requests major compaction use configured priority",
1068                 this.majorCompactPriority);
1069         	}
1070           }
1071         } catch (IOException e) {
1072           LOG.warn("Failed major compaction check on " + r, e);
1073         }
1074       }
1075     }
1076   }
1077 
1078   /**
1079    * Report the status of the server. A server is online once all the startup is
1080    * completed (setting up filesystem, starting service threads, etc.). This
1081    * method is designed mostly to be useful in tests.
1082    *
1083    * @return true if online, false if not.
1084    */
1085   public boolean isOnline() {
1086     return isOnline;
1087   }
1088 
1089   /**
1090    * Setup WAL log and replication if enabled.
1091    * Replication setup is done in here because it wants to be hooked up to WAL.
1092    * @return A WAL instance.
1093    * @throws IOException
1094    */
1095   private HLog setupWALAndReplication() throws IOException {
1096     final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1097     Path logdir = new Path(rootDir, HLog.getHLogDirectoryName(this.serverInfo));
1098     if (LOG.isDebugEnabled()) {
1099       LOG.debug("logdir=" + logdir);
1100     }
1101     if (this.fs.exists(logdir)) {
1102       throw new RegionServerRunningException("Region server already "
1103           + "running at " + this.serverInfo.getServerName()
1104           + " because logdir " + logdir.toString() + " exists");
1105     }
1106 
1107     // Instantiate replication manager if replication enabled.  Pass it the
1108     // log directories.
1109     try {
1110       this.replicationHandler = Replication.isReplication(this.conf)?
1111         new Replication(this, this.fs, logdir, oldLogDir): null;
1112     } catch (KeeperException e) {
1113       throw new IOException("Failed replication handler create", e);
1114     }
1115     return instantiateHLog(logdir, oldLogDir);
1116   }
1117 
1118   /**
1119    * Called by {@link #setupWALAndReplication()} creating WAL instance.
1120    * @param logdir
1121    * @param oldLogDir
1122    * @return WAL instance.
1123    * @throws IOException
1124    */
1125   protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
1126     return new HLog(this.fs, logdir, oldLogDir, this.conf,
1127       getWALActionListeners(), this.serverInfo.getServerAddress().toString());
1128   }
1129 
1130   /**
1131    * Called by {@link #instantiateHLog(Path, Path)} setting up WAL instance.
1132    * Add any {@link WALObserver}s you want inserted before WAL startup.
1133    * @return List of WALActionsListener that will be passed in to
1134    * {@link HLog} on construction.
1135    */
1136   protected List<WALObserver> getWALActionListeners() {
1137     List<WALObserver> listeners = new ArrayList<WALObserver>();
1138     // Log roller.
1139     this.hlogRoller = new LogRoller(this, this);
1140     listeners.add(this.hlogRoller);
1141     if (this.replicationHandler != null) {
1142       // Replication handler is an implementation of WALActionsListener.
1143       listeners.add(this.replicationHandler);
1144     }
1145     return listeners;
1146   }
1147 
1148   protected LogRoller getLogRoller() {
1149     return hlogRoller;
1150   }
1151 
1152   /*
1153    * @param interval Interval since last time metrics were called.
1154    */
1155   protected void doMetrics() {
1156     try {
1157       metrics();
1158     } catch (Throwable e) {
1159       LOG.warn("Failed metrics", e);
1160     }
1161   }
1162 
1163   protected void metrics() {
1164     int seconds = this.msgInterval / 1000;
1165     if(0 == seconds){
1166        seconds = 1;
1167     }
1168     this.metrics.regions.set(this.onlineRegions.size());
1169     this.metrics.requests.set(this.requestCount.get()/seconds);
1170     // Is this too expensive every three seconds getting a lock on onlineRegions
1171     // and then per store carried? Can I make metrics be sloppier and avoid
1172     // the synchronizations?
1173     int stores = 0;
1174     int storefiles = 0;
1175     long memstoreSize = 0;
1176     long storefileIndexSize = 0;
1177     for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1178         HRegion r = e.getValue();
1179         memstoreSize += r.memstoreSize.get();
1180         synchronized (r.stores) {
1181           stores += r.stores.size();
1182           for (Map.Entry<byte[], Store> ee : r.stores.entrySet()) {
1183             Store store = ee.getValue();
1184             storefiles += store.getStorefilesCount();
1185             storefileIndexSize += store.getStorefilesIndexSize();
1186           }
1187         }
1188       }
1189     this.metrics.stores.set(stores);
1190     this.metrics.storefiles.set(storefiles);
1191     this.metrics.memstoreSizeMB.set((int) (memstoreSize / (1024 * 1024)));
1192     this.metrics.storefileIndexSizeMB
1193         .set((int) (storefileIndexSize / (1024 * 1024)));
1194     this.metrics.compactionQueueSize.set(compactSplitThread
1195         .getCompactionQueueSize());
1196     this.metrics.flushQueueSize.set(cacheFlusher
1197         .getFlushQueueSize());
1198 
1199     LruBlockCache lruBlockCache = (LruBlockCache) StoreFile.getBlockCache(conf);
1200     if (lruBlockCache != null) {
1201       this.metrics.blockCacheCount.set(lruBlockCache.size());
1202       this.metrics.blockCacheFree.set(lruBlockCache.getFreeSize());
1203       this.metrics.blockCacheSize.set(lruBlockCache.getCurrentSize());
1204       CacheStats cacheStats = lruBlockCache.getStats();
1205       this.metrics.blockCacheHitCount.set(cacheStats.getHitCount());
1206       this.metrics.blockCacheMissCount.set(cacheStats.getMissCount());
1207       this.metrics.blockCacheEvictedCount.set(lruBlockCache.getEvictedCount());
1208       double ratio = lruBlockCache.getStats().getHitRatio();
1209       int percent = (int) (ratio * 100);
1210       this.metrics.blockCacheHitRatio.set(percent);
1211       ratio = lruBlockCache.getStats().getHitCachingRatio();
1212       percent = (int) (ratio * 100);
1213       this.metrics.blockCacheHitCachingRatio.set(percent);
1214     }
1215   }
1216 
1217   /**
1218    * @return Region server metrics instance.
1219    */
1220   public RegionServerMetrics getMetrics() {
1221     return this.metrics;
1222   }
1223 
1224   /*
1225    * Start maintanence Threads, Server, Worker and lease checker threads.
1226    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
1227    * get an unhandled exception. We cannot set the handler on all threads.
1228    * Server's internal Listener thread is off limits. For Server, if an OOME, it
1229    * waits a while then retries. Meantime, a flush or a compaction that tries to
1230    * run should trigger same critical condition and the shutdown will run. On
1231    * its way out, this server will shut down Server. Leases are sort of
1232    * inbetween. It has an internal thread that while it inherits from Chore, it
1233    * keeps its own internal stop mechanism so needs to be stopped by this
1234    * hosting server. Worker logs the exception and exits.
1235    */
1236   private void startServiceThreads() throws IOException {
1237     String n = Thread.currentThread().getName();
1238     UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
1239       public void uncaughtException(Thread t, Throwable e) {
1240         abort("Uncaught exception in service thread " + t.getName(), e);
1241       }
1242     };
1243 
1244     // Start executor services
1245     this.service = new ExecutorService(getServerName());
1246     this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1247       conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1248     this.service.startExecutorService(ExecutorType.RS_OPEN_ROOT,
1249       conf.getInt("hbase.regionserver.executor.openroot.threads", 1));
1250     this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1251       conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1252     this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1253       conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1254     this.service.startExecutorService(ExecutorType.RS_CLOSE_ROOT,
1255       conf.getInt("hbase.regionserver.executor.closeroot.threads", 1));
1256     this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1257       conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1258 
1259     Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller", handler);
1260     Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
1261         handler);
1262     Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
1263         handler);
1264     Threads.setDaemonThreadRunning(this.majorCompactionChecker, n
1265         + ".majorCompactionChecker", handler);
1266 
1267     // Leases is not a Thread. Internally it runs a daemon thread. If it gets
1268     // an unhandled exception, it will just exit.
1269     this.leases.setName(n + ".leaseChecker");
1270     this.leases.start();
1271     // Put up info server.
1272     int port = this.conf.getInt("hbase.regionserver.info.port", 60030);
1273     // -1 is for disabling info server
1274     if (port >= 0) {
1275       String addr = this.conf.get("hbase.regionserver.info.bindAddress",
1276           "0.0.0.0");
1277       // check if auto port bind enabled
1278       boolean auto = this.conf.getBoolean("hbase.regionserver.info.port.auto",
1279           false);
1280       while (true) {
1281         try {
1282           this.infoServer = new InfoServer("regionserver", addr, port, false);
1283           this.infoServer.setAttribute("regionserver", this);
1284           this.infoServer.start();
1285           break;
1286         } catch (BindException e) {
1287           if (!auto) {
1288             // auto bind disabled throw BindException
1289             throw e;
1290           }
1291           // auto bind enabled, try to use another port
1292           LOG.info("Failed binding http info server to port: " + port);
1293           port++;
1294           // update HRS server info port.
1295           this.serverInfo = new HServerInfo(this.serverInfo.getServerAddress(),
1296             this.serverInfo.getStartCode(), port,
1297             this.serverInfo.getHostname());
1298         }
1299       }
1300     }
1301 
1302     if (this.replicationHandler != null) {
1303       this.replicationHandler.startReplicationServices();
1304     }
1305 
1306     // Start Server.  This service is like leases in that it internally runs
1307     // a thread.
1308     this.server.start();
1309   }
1310 
1311   /*
1312    * Verify that server is healthy
1313    */
1314   private boolean isHealthy() {
1315     if (!fsOk) {
1316       // File system problem
1317       return false;
1318     }
1319     // Verify that all threads are alive
1320     if (!(leases.isAlive() && compactSplitThread.isAlive()
1321         && cacheFlusher.isAlive() && hlogRoller.isAlive()
1322         && this.majorCompactionChecker.isAlive())) {
1323       stop("One or more threads are no longer alive -- stop");
1324       return false;
1325     }
1326     return true;
1327   }
1328 
1329   @Override
1330   public HLog getWAL() {
1331     return this.hlog;
1332   }
1333 
1334   @Override
1335   public CatalogTracker getCatalogTracker() {
1336     return this.catalogTracker;
1337   }
1338 
1339   @Override
1340   public void stop(final String msg) {
1341     this.stopped = true;
1342     LOG.info("STOPPED: " + msg);
1343     synchronized (this) {
1344       // Wakes run() if it is sleeping
1345       notifyAll(); // FindBugs NN_NAKED_NOTIFY
1346     }
1347   }
1348 
1349   @Override
1350   public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct,
1351       final boolean daughter)
1352   throws KeeperException, IOException {
1353     // Do checks to see if we need to compact (references or too many files)
1354     if (r.hasReferences() || r.hasTooManyStoreFiles()) {
1355       getCompactionRequester().requestCompaction(r,
1356         r.hasReferences()? "Region has references on open" :
1357           "Region has too many store files");
1358     }
1359 
1360     // Add to online regions if all above was successful.
1361     addToOnlineRegions(r);
1362 
1363     // Update ZK, ROOT or META
1364     if (r.getRegionInfo().isRootRegion()) {
1365       RootLocationEditor.setRootLocation(getZooKeeper(),
1366         getServerInfo().getServerAddress());
1367     } else if (r.getRegionInfo().isMetaRegion()) {
1368       MetaEditor.updateMetaLocation(ct, r.getRegionInfo(), getServerInfo());
1369     } else {
1370       if (daughter) {
1371         // If daughter of a split, update whole row, not just location.
1372         MetaEditor.addDaughter(ct, r.getRegionInfo(), getServerInfo());
1373       } else {
1374         MetaEditor.updateRegionLocation(ct, r.getRegionInfo(), getServerInfo());
1375       }
1376     }
1377   }
1378 
1379   /**
1380    * Return a reference to the metrics instance used for counting RPC calls.
1381    * @return Metrics instance.
1382    */
1383   public HBaseRpcMetrics getRpcMetrics() {
1384     return server.getRpcMetrics();
1385   }
1386 
1387   /**
1388    * Cause the server to exit without closing the regions it is serving, the log
1389    * it is using and without notifying the master. Used unit testing and on
1390    * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
1391    *
1392    * @param reason
1393    *          the reason we are aborting
1394    * @param cause
1395    *          the exception that caused the abort, or null
1396    */
1397   public void abort(String reason, Throwable cause) {
1398     if (cause != null) {
1399       LOG.fatal("ABORTING region server " + this + ": " + reason, cause);
1400     } else {
1401       LOG.fatal("ABORTING region server " + this + ": " + reason);
1402     }
1403     this.abortRequested = true;
1404     this.reservedSpace.clear();
1405     if (this.metrics != null) {
1406       LOG.info("Dump of metrics: " + this.metrics);
1407     }
1408     stop(reason);
1409   }
1410 
1411   /**
1412    * @see HRegionServer#abort(String, Throwable)
1413    */
1414   public void abort(String reason) {
1415     abort(reason, null);
1416   }
1417 
1418   /*
1419    * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
1420    * logs but it does close socket in case want to bring up server on old
1421    * hostname+port immediately.
1422    */
1423   protected void kill() {
1424     this.killed = true;
1425     abort("Simulated kill");
1426   }
1427 
1428   /**
1429    * Wait on all threads to finish. Presumption is that all closes and stops
1430    * have already been called.
1431    */
1432   protected void join() {
1433     Threads.shutdown(this.majorCompactionChecker);
1434     Threads.shutdown(this.cacheFlusher);
1435     Threads.shutdown(this.compactSplitThread);
1436     Threads.shutdown(this.hlogRoller);
1437     this.service.shutdown();
1438     if (this.replicationHandler != null) {
1439       this.replicationHandler.join();
1440     }
1441   }
1442 
1443   /**
1444    * Get the current master from ZooKeeper and open the RPC connection to it.
1445    *
1446    * Method will block until a master is available. You can break from this
1447    * block by requesting the server stop.
1448    *
1449    * @return master address, or null if server has been stopped
1450    */
1451   private HServerAddress getMaster() {
1452     HServerAddress masterAddress = null;
1453     HMasterRegionInterface master = null;
1454 
1455     while (!stopped && master == null) {
1456 
1457       masterAddress = getMasterAddress();
1458       LOG.info("Attempting connect to Master server at " + masterAddress);
1459       try {
1460         // Do initial RPC setup. The final argument indicates that the RPC
1461         // should retry indefinitely.
1462         master = (HMasterRegionInterface) HBaseRPC.waitForProxy(
1463             HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
1464             masterAddress.getInetSocketAddress(), this.conf, -1,
1465             this.rpcTimeout, this.rpcTimeout);
1466       } catch (IOException e) {
1467         e = e instanceof RemoteException ?
1468             ((RemoteException)e).unwrapRemoteException() : e;
1469         if (e instanceof ServerNotRunningException) {
1470           LOG.info("Master isn't available yet, retrying");
1471         } else {
1472           LOG.warn("Unable to connect to master. Retrying. Error was:", e);
1473         }
1474         sleeper.sleep();
1475       }
1476     }
1477     LOG.info("Connected to master at " + masterAddress);
1478     this.hbaseMaster = master;
1479     return masterAddress;
1480   }
1481 
1482   private HServerAddress getMasterAddress() {
1483     HServerAddress masterAddress = null;
1484       while ((masterAddress = masterAddressManager.getMasterAddress()) == null) {
1485         if (stopped) {
1486           return null;
1487         }
1488         LOG.debug("No master found, will retry");
1489         sleeper.sleep();
1490       }
1491       return masterAddress;
1492    }
1493 
1494   /**
1495    * @return True if successfully invoked {@link #reportForDuty()}
1496    * @throws IOException
1497    */
1498   private boolean tryReportForDuty() throws IOException {
1499     MapWritable w = reportForDuty();
1500     if (w != null) {
1501       handleReportForDutyResponse(w);
1502       return true;
1503     }
1504     sleeper.sleep();
1505     LOG.warn("No response on reportForDuty. Sleeping and then retrying.");
1506     return false;
1507   }
1508 
1509   /*
1510    * Let the master know we're here Run initialization using parameters passed
1511    * us by the master.
1512    */
1513   private MapWritable reportForDuty() throws IOException {
1514     HServerAddress masterAddress = null;
1515     while (!stopped && (masterAddress = getMaster()) == null) {
1516       sleeper.sleep();
1517       LOG.warn("Unable to get master for initialization");
1518     }
1519 
1520     MapWritable result = null;
1521     long lastMsg = 0;
1522     
1523     //If some exception occured while reporting to Master, we should 
1524     //recheck the master address to avoid the address has been updated
1525     boolean recheckMasterAddr = false;
1526     while (!stopped) {
1527       try {
1528         if (recheckMasterAddr) {
1529           masterAddress = getMaster();
1530         }  
1531         this.requestCount.set(0);
1532         lastMsg = System.currentTimeMillis();
1533         ZKUtil.setAddressAndWatch(zooKeeper,
1534           ZKUtil.joinZNode(zooKeeper.rsZNode, ZKUtil.getNodeName(serverInfo)),
1535           this.serverInfo.getServerAddress());
1536         this.serverInfo.setLoad(buildServerLoad());
1537         LOG.info("Telling master at " + masterAddress + " that we are up");
1538         result = this.hbaseMaster.regionServerStartup(this.serverInfo,
1539             EnvironmentEdgeManager.currentTimeMillis());
1540         break;
1541       } catch (RemoteException e) {
1542         IOException ioe = e.unwrapRemoteException();
1543         if (ioe instanceof ClockOutOfSyncException) {
1544           LOG.fatal("Master rejected startup because clock is out of sync",
1545               ioe);
1546           // Re-throw IOE will cause RS to abort
1547           throw ioe;
1548         } else {
1549           recheckMasterAddr = true;
1550           LOG.warn("remote error telling master we are up", e);
1551         }
1552       } catch (IOException e) {
1553         recheckMasterAddr = true;
1554         LOG.warn("error telling master we are up", e);
1555       } catch (KeeperException e) {
1556         recheckMasterAddr = true;
1557         LOG.warn("error putting up ephemeral node in zookeeper", e);
1558       }
1559       sleeper.sleep(lastMsg);
1560     }
1561     return result;
1562   }
1563 
1564   /**
1565    * Add to the outbound message buffer
1566    *
1567    * When a region splits, we need to tell the master that there are two new
1568    * regions that need to be assigned.
1569    *
1570    * We do not need to inform the master about the old region, because we've
1571    * updated the meta or root regions, and the master will pick that up on its
1572    * next rescan of the root or meta tables.
1573    */
1574   void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA,
1575       HRegionInfo newRegionB) {
1576     this.outboundMsgs.add(new HMsg(
1577         HMsg.Type.REGION_SPLIT, oldRegion, newRegionA,
1578         newRegionB, Bytes.toBytes("Daughters; "
1579             + newRegionA.getRegionNameAsString() + ", "
1580             + newRegionB.getRegionNameAsString())));
1581   }
1582 
1583   /**
1584    * Closes all regions.  Called on our way out.
1585    * Assumes that its not possible for new regions to be added to onlineRegions
1586    * while this method runs.
1587    */
1588   protected void closeAllRegions(final boolean abort) {
1589     closeUserRegions(abort);
1590     // Only root and meta should remain.  Are we carrying root or meta?
1591     HRegion meta = null;
1592     HRegion root = null;
1593     this.lock.writeLock().lock();
1594     try {
1595       for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
1596         HRegionInfo hri = e.getValue().getRegionInfo();
1597         if (hri.isRootRegion()) {
1598           root = e.getValue();
1599         } else if (hri.isMetaRegion()) {
1600           meta = e.getValue();
1601         }
1602         if (meta != null && root != null) break;
1603       }
1604     } finally {
1605       this.lock.writeLock().unlock();
1606     }
1607     if (meta != null) closeRegion(meta.getRegionInfo(), abort, false);
1608     if (root != null) closeRegion(root.getRegionInfo(), abort, false);
1609   }
1610 
1611   /**
1612    * Schedule closes on all user regions.
1613    * @param abort Whether we're running an abort.
1614    */
1615   void closeUserRegions(final boolean abort) {
1616     this.lock.writeLock().lock();
1617     try {
1618       for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1619         HRegion r = e.getValue();
1620         if (!r.getRegionInfo().isMetaRegion()) {
1621           // Don't update zk with this close transition; pass false.
1622           closeRegion(r.getRegionInfo(), abort, false);
1623         }
1624       }
1625     } finally {
1626       this.lock.writeLock().unlock();
1627     }
1628   }
1629 
1630   @Override
1631   @QosPriority(priority=HIGH_QOS)
1632   public HRegionInfo getRegionInfo(final byte[] regionName)
1633   throws NotServingRegionException, IOException {
1634     checkOpen();
1635     requestCount.incrementAndGet();
1636     return getRegion(regionName).getRegionInfo();
1637   }
1638 
1639   public Result getClosestRowBefore(final byte[] regionName, final byte[] row,
1640       final byte[] family) throws IOException {
1641     checkOpen();
1642     requestCount.incrementAndGet();
1643     try {
1644       // locate the region we're operating on
1645       HRegion region = getRegion(regionName);
1646       // ask the region for all the data
1647 
1648       Result r = region.getClosestRowBefore(row, family);
1649       return r;
1650     } catch (Throwable t) {
1651       throw convertThrowableToIOE(cleanup(t));
1652     }
1653   }
1654 
1655   /** {@inheritDoc} */
1656   public Result get(byte[] regionName, Get get) throws IOException {
1657     checkOpen();
1658     requestCount.incrementAndGet();
1659     try {
1660       HRegion region = getRegion(regionName);
1661       return region.get(get, getLockFromId(get.getLockId()));
1662     } catch (Throwable t) {
1663       throw convertThrowableToIOE(cleanup(t));
1664     }
1665   }
1666 
1667   public boolean exists(byte[] regionName, Get get) throws IOException {
1668     checkOpen();
1669     requestCount.incrementAndGet();
1670     try {
1671       HRegion region = getRegion(regionName);
1672       Result r = region.get(get, getLockFromId(get.getLockId()));
1673       return r != null && !r.isEmpty();
1674     } catch (Throwable t) {
1675       throw convertThrowableToIOE(cleanup(t));
1676     }
1677   }
1678 
1679   public void put(final byte[] regionName, final Put put) throws IOException {
1680     if (put.getRow() == null) {
1681       throw new IllegalArgumentException("update has null row");
1682     }
1683 
1684     checkOpen();
1685     this.requestCount.incrementAndGet();
1686     HRegion region = getRegion(regionName);
1687     try {
1688       if (!region.getRegionInfo().isMetaTable()) {
1689         this.cacheFlusher.reclaimMemStoreMemory();
1690       }
1691       boolean writeToWAL = put.getWriteToWAL();
1692       region.put(put, getLockFromId(put.getLockId()), writeToWAL);
1693     } catch (Throwable t) {
1694       throw convertThrowableToIOE(cleanup(t));
1695     }
1696   }
1697 
1698   public int put(final byte[] regionName, final List<Put> puts)
1699       throws IOException {
1700     checkOpen();
1701     HRegion region = null;
1702     try {
1703       region = getRegion(regionName);
1704       if (!region.getRegionInfo().isMetaTable()) {
1705         this.cacheFlusher.reclaimMemStoreMemory();
1706       }
1707 
1708       @SuppressWarnings("unchecked")
1709       Pair<Put, Integer>[] putsWithLocks = new Pair[puts.size()];
1710 
1711       int i = 0;
1712       for (Put p : puts) {
1713         Integer lock = getLockFromId(p.getLockId());
1714         putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
1715       }
1716 
1717       this.requestCount.addAndGet(puts.size());
1718       OperationStatusCode[] codes = region.put(putsWithLocks);
1719       for (i = 0; i < codes.length; i++) {
1720         if (codes[i] != OperationStatusCode.SUCCESS) {
1721           return i;
1722         }
1723       }
1724       return -1;
1725     } catch (Throwable t) {
1726       throw convertThrowableToIOE(cleanup(t));
1727     }
1728   }
1729 
1730   private boolean checkAndMutate(final byte[] regionName, final byte[] row,
1731       final byte[] family, final byte[] qualifier, final byte[] value,
1732       final Writable w, Integer lock) throws IOException {
1733     checkOpen();
1734     this.requestCount.incrementAndGet();
1735     HRegion region = getRegion(regionName);
1736     try {
1737       if (!region.getRegionInfo().isMetaTable()) {
1738         this.cacheFlusher.reclaimMemStoreMemory();
1739       }
1740       return region
1741           .checkAndMutate(row, family, qualifier, value, w, lock, true);
1742     } catch (Throwable t) {
1743       throw convertThrowableToIOE(cleanup(t));
1744     }
1745   }
1746 
1747   /**
1748    *
1749    * @param regionName
1750    * @param row
1751    * @param family
1752    * @param qualifier
1753    * @param value
1754    *          the expected value
1755    * @param put
1756    * @throws IOException
1757    * @return true if the new put was execute, false otherwise
1758    */
1759   public boolean checkAndPut(final byte[] regionName, final byte[] row,
1760       final byte[] family, final byte[] qualifier, final byte[] value,
1761       final Put put) throws IOException {
1762     return checkAndMutate(regionName, row, family, qualifier, value, put,
1763         getLockFromId(put.getLockId()));
1764   }
1765 
1766   /**
1767    *
1768    * @param regionName
1769    * @param row
1770    * @param family
1771    * @param qualifier
1772    * @param value
1773    *          the expected value
1774    * @param delete
1775    * @throws IOException
1776    * @return true if the new put was execute, false otherwise
1777    */
1778   public boolean checkAndDelete(final byte[] regionName, final byte[] row,
1779       final byte[] family, final byte[] qualifier, final byte[] value,
1780       final Delete delete) throws IOException {
1781     return checkAndMutate(regionName, row, family, qualifier, value, delete,
1782         getLockFromId(delete.getLockId()));
1783   }
1784 
1785   //
1786   // remote scanner interface
1787   //
1788 
1789   public long openScanner(byte[] regionName, Scan scan) throws IOException {
1790     checkOpen();
1791     NullPointerException npe = null;
1792     if (regionName == null) {
1793       npe = new NullPointerException("regionName is null");
1794     } else if (scan == null) {
1795       npe = new NullPointerException("scan is null");
1796     }
1797     if (npe != null) {
1798       throw new IOException("Invalid arguments to openScanner", npe);
1799     }
1800     requestCount.incrementAndGet();
1801     try {
1802       HRegion r = getRegion(regionName);
1803       return addScanner(r.getScanner(scan));
1804     } catch (Throwable t) {
1805       throw convertThrowableToIOE(cleanup(t, "Failed openScanner"));
1806     }
1807   }
1808 
1809   protected long addScanner(InternalScanner s) throws LeaseStillHeldException {
1810     long scannerId = -1L;
1811     scannerId = rand.nextLong();
1812     String scannerName = String.valueOf(scannerId);
1813     scanners.put(scannerName, s);
1814     this.leases.createLease(scannerName, new ScannerListener(scannerName));
1815     return scannerId;
1816   }
1817 
1818   public Result next(final long scannerId) throws IOException {
1819     Result[] res = next(scannerId, 1);
1820     if (res == null || res.length == 0) {
1821       return null;
1822     }
1823     return res[0];
1824   }
1825 
1826   public Result[] next(final long scannerId, int nbRows) throws IOException {
1827     String scannerName = String.valueOf(scannerId);
1828     InternalScanner s = this.scanners.get(scannerName);
1829     if (s == null) throw new UnknownScannerException("Name: " + scannerName);
1830     try {
1831       checkOpen();
1832     } catch (IOException e) {
1833       // If checkOpen failed, server not running or filesystem gone,
1834       // cancel this lease; filesystem is gone or we're closing or something.
1835       try {
1836         this.leases.cancelLease(scannerName);
1837       } catch (LeaseException le) {
1838         LOG.info("Server shutting down and client tried to access missing scanner " +
1839           scannerName);
1840       }
1841       throw e;
1842     }
1843     Leases.Lease lease = null;
1844     try {
1845       // Remove lease while its being processed in server; protects against case
1846       // where processing of request takes > lease expiration time.
1847       lease = this.leases.removeLease(scannerName);
1848       List<Result> results = new ArrayList<Result>(nbRows);
1849       long currentScanResultSize = 0;
1850       List<KeyValue> values = new ArrayList<KeyValue>();
1851       for (int i = 0; i < nbRows
1852           && currentScanResultSize < maxScannerResultSize; i++) {
1853         requestCount.incrementAndGet();
1854         // Collect values to be returned here
1855         boolean moreRows = s.next(values);
1856         if (!values.isEmpty()) {
1857           for (KeyValue kv : values) {
1858             currentScanResultSize += kv.heapSize();
1859           }
1860           results.add(new Result(values));
1861         }
1862         if (!moreRows) {
1863           break;
1864         }
1865         values.clear();
1866       }
1867       // Below is an ugly hack where we cast the InternalScanner to be a
1868       // HRegion.RegionScanner. The alternative is to change InternalScanner
1869       // interface but its used everywhere whereas we just need a bit of info
1870       // from HRegion.RegionScanner, IF its filter if any is done with the scan
1871       // and wants to tell the client to stop the scan. This is done by passing
1872       // a null result.
1873       return ((HRegion.RegionScanner) s).isFilterDone() && results.isEmpty() ? null
1874           : results.toArray(new Result[0]);
1875     } catch (Throwable t) {
1876       if (t instanceof NotServingRegionException) {
1877         this.scanners.remove(scannerName);
1878       }
1879       throw convertThrowableToIOE(cleanup(t));
1880     } finally {
1881       // We're done. On way out readd the above removed lease.  Adding resets
1882       // expiration time on lease.
1883       if (this.scanners.containsKey(scannerName)) {
1884         if (lease != null) this.leases.addLease(lease);
1885       }
1886     }
1887   }
1888 
1889   public void close(final long scannerId) throws IOException {
1890     try {
1891       checkOpen();
1892       requestCount.incrementAndGet();
1893       String scannerName = String.valueOf(scannerId);
1894       InternalScanner s = scanners.remove(scannerName);
1895       if (s != null) {
1896         s.close();
1897         this.leases.cancelLease(scannerName);
1898       }
1899     } catch (Throwable t) {
1900       throw convertThrowableToIOE(cleanup(t));
1901     }
1902   }
1903 
1904   /**
1905    * Instantiated as a scanner lease. If the lease times out, the scanner is
1906    * closed
1907    */
1908   private class ScannerListener implements LeaseListener {
1909     private final String scannerName;
1910 
1911     ScannerListener(final String n) {
1912       this.scannerName = n;
1913     }
1914 
1915     public void leaseExpired() {
1916       LOG.info("Scanner " + this.scannerName + " lease expired");
1917       InternalScanner s = scanners.remove(this.scannerName);
1918       if (s != null) {
1919         try {
1920           s.close();
1921         } catch (IOException e) {
1922           LOG.error("Closing scanner", e);
1923         }
1924       }
1925     }
1926   }
1927 
1928   //
1929   // Methods that do the actual work for the remote API
1930   //
1931   public void delete(final byte[] regionName, final Delete delete)
1932       throws IOException {
1933     checkOpen();
1934     try {
1935       boolean writeToWAL = true;
1936       this.requestCount.incrementAndGet();
1937       HRegion region = getRegion(regionName);
1938       if (!region.getRegionInfo().isMetaTable()) {
1939         this.cacheFlusher.reclaimMemStoreMemory();
1940       }
1941       Integer lid = getLockFromId(delete.getLockId());
1942       region.delete(delete, lid, writeToWAL);
1943     } catch (Throwable t) {
1944       throw convertThrowableToIOE(cleanup(t));
1945     }
1946   }
1947 
1948   public int delete(final byte[] regionName, final List<Delete> deletes)
1949       throws IOException {
1950     checkOpen();
1951     // Count of Deletes processed.
1952     int i = 0;
1953     HRegion region = null;
1954     try {
1955       boolean writeToWAL = true;
1956       region = getRegion(regionName);
1957       if (!region.getRegionInfo().isMetaTable()) {
1958         this.cacheFlusher.reclaimMemStoreMemory();
1959       }
1960       int size = deletes.size();
1961       Integer[] locks = new Integer[size];
1962       for (Delete delete : deletes) {
1963         this.requestCount.incrementAndGet();
1964         locks[i] = getLockFromId(delete.getLockId());
1965         region.delete(delete, locks[i], writeToWAL);
1966         i++;
1967       }
1968     } catch (WrongRegionException ex) {
1969       LOG.debug("Batch deletes: " + i, ex);
1970       return i;
1971     } catch (NotServingRegionException ex) {
1972       return i;
1973     } catch (Throwable t) {
1974       throw convertThrowableToIOE(cleanup(t));
1975     }
1976     return -1;
1977   }
1978 
1979   public long lockRow(byte[] regionName, byte[] row) throws IOException {
1980     checkOpen();
1981     NullPointerException npe = null;
1982     if (regionName == null) {
1983       npe = new NullPointerException("regionName is null");
1984     } else if (row == null) {
1985       npe = new NullPointerException("row to lock is null");
1986     }
1987     if (npe != null) {
1988       IOException io = new IOException("Invalid arguments to lockRow");
1989       io.initCause(npe);
1990       throw io;
1991     }
1992     requestCount.incrementAndGet();
1993     try {
1994       HRegion region = getRegion(regionName);
1995       Integer r = region.obtainRowLock(row);
1996       long lockId = addRowLock(r, region);
1997       LOG.debug("Row lock " + lockId + " explicitly acquired by client");
1998       return lockId;
1999     } catch (Throwable t) {
2000       throw convertThrowableToIOE(cleanup(t, "Error obtaining row lock (fsOk: "
2001           + this.fsOk + ")"));
2002     }
2003   }
2004 
2005   protected long addRowLock(Integer r, HRegion region)
2006       throws LeaseStillHeldException {
2007     long lockId = -1L;
2008     lockId = rand.nextLong();
2009     String lockName = String.valueOf(lockId);
2010     rowlocks.put(lockName, r);
2011     this.leases.createLease(lockName, new RowLockListener(lockName, region));
2012     return lockId;
2013   }
2014 
2015   /**
2016    * Method to get the Integer lock identifier used internally from the long
2017    * lock identifier used by the client.
2018    *
2019    * @param lockId
2020    *          long row lock identifier from client
2021    * @return intId Integer row lock used internally in HRegion
2022    * @throws IOException
2023    *           Thrown if this is not a valid client lock id.
2024    */
2025   Integer getLockFromId(long lockId) throws IOException {
2026     if (lockId == -1L) {
2027       return null;
2028     }
2029     String lockName = String.valueOf(lockId);
2030     Integer rl = rowlocks.get(lockName);
2031     if (rl == null) {
2032       throw new UnknownRowLockException("Invalid row lock");
2033     }
2034     this.leases.renewLease(lockName);
2035     return rl;
2036   }
2037 
2038   @Override
2039   @QosPriority(priority=HIGH_QOS)
2040   public void unlockRow(byte[] regionName, long lockId) throws IOException {
2041     checkOpen();
2042     NullPointerException npe = null;
2043     if (regionName == null) {
2044       npe = new NullPointerException("regionName is null");
2045     } else if (lockId == -1L) {
2046       npe = new NullPointerException("lockId is null");
2047     }
2048     if (npe != null) {
2049       IOException io = new IOException("Invalid arguments to unlockRow");
2050       io.initCause(npe);
2051       throw io;
2052     }
2053     requestCount.incrementAndGet();
2054     try {
2055       HRegion region = getRegion(regionName);
2056       String lockName = String.valueOf(lockId);
2057       Integer r = rowlocks.remove(lockName);
2058       if (r == null) {
2059         throw new UnknownRowLockException(lockName);
2060       }
2061       region.releaseRowLock(r);
2062       this.leases.cancelLease(lockName);
2063       LOG.debug("Row lock " + lockId
2064           + " has been explicitly released by client");
2065     } catch (Throwable t) {
2066       throw convertThrowableToIOE(cleanup(t));
2067     }
2068   }
2069 
2070   @Override
2071   public void bulkLoadHFile(String hfilePath, byte[] regionName,
2072       byte[] familyName) throws IOException {
2073     checkOpen();
2074     HRegion region = getRegion(regionName);
2075     region.bulkLoadHFile(hfilePath, familyName);
2076   }
2077 
2078   Map<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
2079 
2080   /**
2081    * Instantiated as a row lock lease. If the lease times out, the row lock is
2082    * released
2083    */
2084   private class RowLockListener implements LeaseListener {
2085     private final String lockName;
2086     private final HRegion region;
2087 
2088     RowLockListener(final String lockName, final HRegion region) {
2089       this.lockName = lockName;
2090       this.region = region;
2091     }
2092 
2093     public void leaseExpired() {
2094       LOG.info("Row Lock " + this.lockName + " lease expired");
2095       Integer r = rowlocks.remove(this.lockName);
2096       if (r != null) {
2097         region.releaseRowLock(r);
2098       }
2099     }
2100   }
2101 
2102   // Region open/close direct RPCs
2103 
2104   @Override
2105   @QosPriority(priority=HIGH_QOS)
2106   public void openRegion(HRegionInfo region)
2107   throws IOException {
2108     if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) {
2109       throw new RegionAlreadyInTransitionException("open", region.getEncodedName());
2110     }
2111     LOG.info("Received request to open region: " +
2112       region.getRegionNameAsString());
2113     if (this.stopped) throw new RegionServerStoppedException();
2114     if (region.isRootRegion()) {
2115       this.service.submit(new OpenRootHandler(this, this, region));
2116     } else if(region.isMetaRegion()) {
2117       this.service.submit(new OpenMetaHandler(this, this, region));
2118     } else {
2119       this.service.submit(new OpenRegionHandler(this, this, region));
2120     }
2121   }
2122 
2123   @Override
2124   @QosPriority(priority=HIGH_QOS)
2125   public void openRegions(List<HRegionInfo> regions)
2126   throws IOException {
2127     LOG.info("Received request to open " + regions.size() + " region(s)");
2128     for (HRegionInfo region: regions) openRegion(region);
2129   }
2130 
2131   @Override
2132   @QosPriority(priority=HIGH_QOS)
2133   public boolean closeRegion(HRegionInfo region)
2134   throws IOException {
2135     return closeRegion(region, true);
2136   }
2137 
2138   @Override
2139   @QosPriority(priority=HIGH_QOS)
2140   public boolean closeRegion(HRegionInfo region, final boolean zk)
2141   throws IOException {
2142     LOG.info("Received close region: " + region.getRegionNameAsString());
2143     boolean hasit = this.onlineRegions.containsKey(region.getEncodedName());
2144     if (!hasit) {
2145       LOG.warn("Received close for region we are not serving; " +
2146         region.getEncodedName());
2147       throw new NotServingRegionException("Received close for "
2148         + region.getRegionNameAsString() + " but we are not serving it");
2149     }
2150     if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) {
2151       throw new RegionAlreadyInTransitionException("close", region.getEncodedName());
2152     }
2153     return closeRegion(region, false, zk);
2154   }
2155 
2156   /**
2157    * @param region Region to close
2158    * @param abort True if we are aborting
2159    * @param zk True if we are to update zk about the region close; if the close
2160    * was orchestrated by master, then update zk.  If the close is being run by
2161    * the regionserver because its going down, don't update zk.
2162    * @return True if closed a region.
2163    */
2164   protected boolean closeRegion(HRegionInfo region, final boolean abort,
2165       final boolean zk) {
2166     if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) {
2167       LOG.warn("Received close for region we are already opening or closing; " +
2168           region.getEncodedName());
2169       return false;
2170     }
2171     CloseRegionHandler crh = null;
2172     if (region.isRootRegion()) {
2173       crh = new CloseRootHandler(this, this, region, abort, zk);
2174     } else if (region.isMetaRegion()) {
2175       crh = new CloseMetaHandler(this, this, region, abort, zk);
2176     } else {
2177       crh = new CloseRegionHandler(this, this, region, abort, zk);
2178     }
2179     this.service.submit(crh);
2180     return true;
2181   }
2182 
2183   // Manual remote region administration RPCs
2184 
2185   @Override
2186   @QosPriority(priority=HIGH_QOS)
2187   public void flushRegion(HRegionInfo regionInfo)
2188       throws NotServingRegionException, IOException {
2189     checkOpen();
2190     LOG.info("Flushing " + regionInfo.getRegionNameAsString());
2191     HRegion region = getRegion(regionInfo.getRegionName());
2192     region.flushcache();
2193   }
2194 
2195   @Override
2196   @QosPriority(priority=HIGH_QOS)
2197   public void splitRegion(HRegionInfo regionInfo)
2198       throws NotServingRegionException, IOException {
2199     splitRegion(regionInfo, null);
2200   }
2201 
2202   @Override
2203   public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
2204       throws NotServingRegionException, IOException {
2205     checkOpen();
2206     HRegion region = getRegion(regionInfo.getRegionName());
2207     region.flushcache();
2208     region.forceSplit(splitPoint);
2209     // force a compaction, split will be side-effect
2210     // TODO: flush/compact/split refactor will make it trivial to do this
2211     // sync/async (and won't require us to do a compaction to split!)
2212     compactSplitThread.requestCompaction(region, "User-triggered split",
2213         CompactSplitThread.PRIORITY_USER);
2214   }
2215 
2216   @Override
2217   @QosPriority(priority=HIGH_QOS)
2218   public void compactRegion(HRegionInfo regionInfo, boolean major)
2219       throws NotServingRegionException, IOException {
2220     checkOpen();
2221     HRegion region = getRegion(regionInfo.getRegionName());
2222     compactSplitThread.requestCompaction(region, major, "User-triggered "
2223         + (major ? "major " : "") + "compaction",
2224         CompactSplitThread.PRIORITY_USER);
2225   }
2226 
2227   /** @return the info server */
2228   public InfoServer getInfoServer() {
2229     return infoServer;
2230   }
2231 
2232   /**
2233    * @return true if a stop has been requested.
2234    */
2235   public boolean isStopped() {
2236     return this.stopped;
2237   }
2238 
2239   @Override
2240   public boolean isStopping() {
2241     return this.stopping;
2242   }
2243 
2244   /**
2245    *
2246    * @return the configuration
2247    */
2248   public Configuration getConfiguration() {
2249     return conf;
2250   }
2251 
2252   /** @return the write lock for the server */
2253   ReentrantReadWriteLock.WriteLock getWriteLock() {
2254     return lock.writeLock();
2255   }
2256 
2257   @Override
2258   @QosPriority(priority=HIGH_QOS)
2259   public List<HRegionInfo> getOnlineRegions() {
2260     List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
2261     for (Map.Entry<String,HRegion> e: this.onlineRegions.entrySet()) {
2262       list.add(e.getValue().getRegionInfo());
2263     }
2264     Collections.sort(list);
2265     return list;
2266   }
2267 
2268   public int getNumberOfOnlineRegions() {
2269     int size = -1;
2270     size = this.onlineRegions.size();
2271     return size;
2272   }
2273 
2274   boolean isOnlineRegionsEmpty() {
2275     return this.onlineRegions.isEmpty();
2276   }
2277 
2278   /**
2279    * For tests and web ui.
2280    * This method will only work if HRegionServer is in the same JVM as client;
2281    * HRegion cannot be serialized to cross an rpc.
2282    * @see #getOnlineRegions()
2283    */
2284   public Collection<HRegion> getOnlineRegionsLocalContext() {
2285     Collection<HRegion> regions = this.onlineRegions.values();
2286     return Collections.unmodifiableCollection(regions);
2287   }
2288 
2289   @Override
2290   public void addToOnlineRegions(HRegion region) {
2291     this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2292   }
2293 
2294   @Override
2295   public boolean removeFromOnlineRegions(final String encodedName) {
2296     HRegion toReturn = null;
2297     toReturn = this.onlineRegions.remove(encodedName);
2298     return toReturn != null;
2299   }
2300 
2301   /**
2302    * @return A new Map of online regions sorted by region size with the first
2303    *         entry being the biggest.
2304    */
2305   public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
2306     // we'll sort the regions in reverse
2307     SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
2308         new Comparator<Long>() {
2309           public int compare(Long a, Long b) {
2310             return -1 * a.compareTo(b);
2311           }
2312         });
2313     // Copy over all regions. Regions are sorted by size with biggest first.
2314     for (HRegion region : this.onlineRegions.values()) {
2315       sortedRegions.put(Long.valueOf(region.memstoreSize.get()), region);
2316     }
2317     return sortedRegions;
2318   }
2319 
2320   @Override
2321   public HRegion getFromOnlineRegions(final String encodedRegionName) {
2322     HRegion r = null;
2323     r = this.onlineRegions.get(encodedRegionName);
2324     return r;
2325   }
2326 
2327   /**
2328    * @param regionName
2329    * @return HRegion for the passed binary <code>regionName</code> or null if
2330    *         named region is not member of the online regions.
2331    */
2332   public HRegion getOnlineRegion(final byte[] regionName) {
2333     return getFromOnlineRegions(HRegionInfo.encodeRegionName(regionName));
2334   }
2335 
2336   /** @return the request count */
2337   public AtomicInteger getRequestCount() {
2338     return this.requestCount;
2339   }
2340 
2341   /** @return reference to FlushRequester */
2342   public FlushRequester getFlushRequester() {
2343     return this.cacheFlusher;
2344   }
2345 
2346   /**
2347    * Protected utility method for safely obtaining an HRegion handle.
2348    *
2349    * @param regionName
2350    *          Name of online {@link HRegion} to return
2351    * @return {@link HRegion} for <code>regionName</code>
2352    * @throws NotServingRegionException
2353    */
2354   protected HRegion getRegion(final byte[] regionName)
2355       throws NotServingRegionException {
2356     HRegion region = null;
2357     region = getOnlineRegion(regionName);
2358     if (region == null) {
2359       throw new NotServingRegionException("Region is not online: " +
2360         Bytes.toStringBinary(regionName));
2361     }
2362     return region;
2363   }
2364 
2365   /**
2366    * Get the top N most loaded regions this server is serving so we can tell the
2367    * master which regions it can reallocate if we're overloaded. TODO: actually
2368    * calculate which regions are most loaded. (Right now, we're just grabbing
2369    * the first N regions being served regardless of load.)
2370    */
2371   protected HRegionInfo[] getMostLoadedRegions() {
2372     ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2373     for (HRegion r : onlineRegions.values()) {
2374       if (r.isClosed() || r.isClosing()) {
2375         continue;
2376       }
2377       if (regions.size() < numRegionsToReport) {
2378         regions.add(r.getRegionInfo());
2379       } else {
2380         break;
2381       }
2382     }
2383     return regions.toArray(new HRegionInfo[regions.size()]);
2384   }
2385 
2386   /**
2387    * Called to verify that this server is up and running.
2388    *
2389    * @throws IOException
2390    */
2391   protected void checkOpen() throws IOException {
2392     if (this.stopped || this.abortRequested) {
2393       throw new IOException("Server not running"
2394           + (this.abortRequested ? ", aborting" : ""));
2395     }
2396     if (!fsOk) {
2397       throw new IOException("File system not available");
2398     }
2399   }
2400 
2401   /**
2402    * @return Returns list of non-closed regions hosted on this server. If no
2403    *         regions to check, returns an empty list.
2404    */
2405   protected Set<HRegion> getRegionsToCheck() {
2406     HashSet<HRegion> regionsToCheck = new HashSet<HRegion>();
2407     regionsToCheck.addAll(this.onlineRegions.values());
2408     // Purge closed regions.
2409     for (final Iterator<HRegion> i = regionsToCheck.iterator(); i.hasNext();) {
2410       HRegion r = i.next();
2411       if (r.isClosed()) {
2412         i.remove();
2413       }
2414     }
2415     return regionsToCheck;
2416   }
2417 
2418   @Override
2419   @QosPriority(priority=HIGH_QOS)
2420   public long getProtocolVersion(final String protocol, final long clientVersion)
2421       throws IOException {
2422     if (protocol.equals(HRegionInterface.class.getName())) {
2423       return HBaseRPCProtocolVersion.versionID;
2424     }
2425     throw new IOException("Unknown protocol to name node: " + protocol);
2426   }
2427 
2428   /**
2429    * @return Queue to which you can add outbound messages.
2430    */
2431   protected LinkedBlockingQueue<HMsg> getOutboundMsgs() {
2432     return this.outboundMsgs;
2433   }
2434 
2435   /**
2436    * Return the total size of all memstores in every region.
2437    *
2438    * @return memstore size in bytes
2439    */
2440   public long getGlobalMemStoreSize() {
2441     long total = 0;
2442     for (HRegion region : onlineRegions.values()) {
2443       total += region.memstoreSize.get();
2444     }
2445     return total;
2446   }
2447 
2448   /**
2449    * @return Return the leases.
2450    */
2451   protected Leases getLeases() {
2452     return leases;
2453   }
2454 
2455   /**
2456    * @return Return the rootDir.
2457    */
2458   protected Path getRootDir() {
2459     return rootDir;
2460   }
2461 
2462   /**
2463    * @return Return the fs.
2464    */
2465   protected FileSystem getFileSystem() {
2466     return fs;
2467   }
2468 
2469   /**
2470    * @return Info on port this server has bound to, etc.
2471    */
2472   public HServerInfo getServerInfo() {
2473     return this.serverInfo;
2474   }
2475 
2476 
2477   @Override
2478   public Result increment(byte[] regionName, Increment increment)
2479   throws IOException {
2480     checkOpen();
2481     if (regionName == null) {
2482       throw new IOException("Invalid arguments to increment " +
2483       "regionName is null");
2484     }
2485     requestCount.incrementAndGet();
2486     try {
2487       HRegion region = getRegion(regionName);
2488       return region.increment(increment, getLockFromId(increment.getLockId()),
2489           increment.getWriteToWAL());
2490     } catch (IOException e) {
2491       checkFileSystem();
2492       throw e;
2493     }
2494   }
2495 
2496   /** {@inheritDoc} */
2497   public long incrementColumnValue(byte[] regionName, byte[] row,
2498       byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
2499       throws IOException {
2500     checkOpen();
2501 
2502     if (regionName == null) {
2503       throw new IOException("Invalid arguments to incrementColumnValue "
2504           + "regionName is null");
2505     }
2506     requestCount.incrementAndGet();
2507     try {
2508       HRegion region = getRegion(regionName);
2509       long retval = region.incrementColumnValue(row, family, qualifier, amount,
2510           writeToWAL);
2511 
2512       return retval;
2513     } catch (IOException e) {
2514       checkFileSystem();
2515       throw e;
2516     }
2517   }
2518 
2519   /** {@inheritDoc} */
2520   @Override
2521   @QosPriority(priority=HIGH_QOS)
2522   public HServerInfo getHServerInfo() throws IOException {
2523     checkOpen();
2524     return serverInfo;
2525   }
2526 
2527   @SuppressWarnings("unchecked")
2528   @Override
2529   public MultiResponse multi(MultiAction multi) throws IOException {
2530     checkOpen();
2531     MultiResponse response = new MultiResponse();
2532 
2533     for (Map.Entry<byte[], List<Action>> e : multi.actions.entrySet()) {
2534       byte[] regionName = e.getKey();
2535       List<Action> actionsForRegion = e.getValue();
2536       // sort based on the row id - this helps in the case where we reach the
2537       // end of a region, so that we don't have to try the rest of the
2538       // actions in the list.
2539       Collections.sort(actionsForRegion);
2540       Row action;
2541       List<Action> puts = new ArrayList<Action>();
2542       for (Action a : actionsForRegion) {
2543         action = a.getAction();
2544         int originalIndex = a.getOriginalIndex();
2545 
2546         try {
2547           if (action instanceof Delete) {
2548             delete(regionName, (Delete) action);
2549             response.add(regionName, originalIndex, new Result());
2550           } else if (action instanceof Get) {
2551             response.add(regionName, originalIndex, get(regionName, (Get) action));
2552           } else if (action instanceof Put) {
2553             puts.add(a);  // wont throw.
2554           } else {
2555             LOG.debug("Error: invalid Action, row must be a Get, Delete or Put.");
2556             throw new DoNotRetryIOException("Invalid Action, row must be a Get, Delete or Put.");
2557           }
2558         } catch (IOException ex) {
2559           response.add(regionName, originalIndex, ex);
2560         }
2561       }
2562 
2563       // We do the puts with result.put so we can get the batching efficiency
2564       // we so need. All this data munging doesn't seem great, but at least
2565       // we arent copying bytes or anything.
2566       if (!puts.isEmpty()) {
2567         try {
2568           HRegion region = getRegion(regionName);
2569 
2570           if (!region.getRegionInfo().isMetaTable()) {
2571             this.cacheFlusher.reclaimMemStoreMemory();
2572           }
2573 
2574           List<Pair<Put,Integer>> putsWithLocks =
2575               Lists.newArrayListWithCapacity(puts.size());
2576           for (Action a : puts) {
2577             Put p = (Put) a.getAction();
2578 
2579             Integer lock;
2580             try {
2581               lock = getLockFromId(p.getLockId());
2582             } catch (UnknownRowLockException ex) {
2583               response.add(regionName, a.getOriginalIndex(), ex);
2584               continue;
2585             }
2586             putsWithLocks.add(new Pair<Put, Integer>(p, lock));
2587           }
2588 
2589           this.requestCount.addAndGet(puts.size());
2590 
2591           OperationStatusCode[] codes =
2592               region.put(putsWithLocks.toArray(new Pair[]{}));
2593 
2594           for( int i = 0 ; i < codes.length ; i++) {
2595             OperationStatusCode code = codes[i];
2596 
2597             Action theAction = puts.get(i);
2598             Object result = null;
2599 
2600             if (code == OperationStatusCode.SUCCESS) {
2601               result = new Result();
2602             } else if (code == OperationStatusCode.BAD_FAMILY) {
2603               result = new NoSuchColumnFamilyException();
2604             }
2605             // FAILURE && NOT_RUN becomes null, aka: need to run again.
2606 
2607             response.add(regionName, theAction.getOriginalIndex(), result);
2608           }
2609         } catch (IOException ioe) {
2610           // fail all the puts with the ioe in question.
2611           for (Action a: puts) {
2612             response.add(regionName, a.getOriginalIndex(), ioe);
2613           }
2614         }
2615       }
2616     }
2617     return response;
2618   }
2619 
2620   /**
2621    * @deprecated Use HRegionServer.multi( MultiAction action) instead
2622    */
2623   @Override
2624   public MultiPutResponse multiPut(MultiPut puts) throws IOException {
2625     checkOpen();
2626     MultiPutResponse resp = new MultiPutResponse();
2627 
2628     // do each region as it's own.
2629     for (Map.Entry<byte[], List<Put>> e : puts.puts.entrySet()) {
2630       int result = put(e.getKey(), e.getValue());
2631       resp.addResult(e.getKey(), result);
2632 
2633       e.getValue().clear(); // clear some RAM
2634     }
2635 
2636     return resp;
2637   }
2638 
2639   public String toString() {
2640     return this.serverInfo.toString();
2641   }
2642 
2643   /**
2644    * Interval at which threads should run
2645    *
2646    * @return the interval
2647    */
2648   public int getThreadWakeFrequency() {
2649     return threadWakeFrequency;
2650   }
2651 
2652   @Override
2653   public ZooKeeperWatcher getZooKeeper() {
2654     return zooKeeper;
2655   }
2656 
2657   @Override
2658   public String getServerName() {
2659     return serverInfo.getServerName();
2660   }
2661 
2662   @Override
2663   public CompactionRequestor getCompactionRequester() {
2664     return this.compactSplitThread;
2665   }
2666 
2667   public Set<byte[]> getRegionsInTransitionInRS() {
2668     return this.regionsInTransitionInRS;
2669   }
2670 
2671   //
2672   // Main program and support routines
2673   //
2674 
2675   /**
2676    * @param hrs
2677    * @return Thread the RegionServer is running in correctly named.
2678    * @throws IOException
2679    */
2680   public static Thread startRegionServer(final HRegionServer hrs)
2681       throws IOException {
2682     return startRegionServer(hrs, "regionserver"
2683         + hrs.getServerInfo().getServerAddress().getPort());
2684   }
2685 
2686   /**
2687    * @param hrs
2688    * @param name
2689    * @return Thread the RegionServer is running in correctly named.
2690    * @throws IOException
2691    */
2692   public static Thread startRegionServer(final HRegionServer hrs,
2693       final String name) throws IOException {
2694     Thread t = new Thread(hrs);
2695     t.setName(name);
2696     t.start();
2697     // Install shutdown hook that will catch signals and run an orderly shutdown
2698     // of the hrs.
2699     ShutdownHook.install(hrs.getConfiguration(), FileSystem.get(hrs
2700         .getConfiguration()), hrs, t);
2701     return t;
2702   }
2703 
2704   /**
2705    * Utility for constructing an instance of the passed HRegionServer class.
2706    *
2707    * @param regionServerClass
2708    * @param conf2
2709    * @return HRegionServer instance.
2710    */
2711   public static HRegionServer constructRegionServer(
2712       Class<? extends HRegionServer> regionServerClass,
2713       final Configuration conf2) {
2714     try {
2715       Constructor<? extends HRegionServer> c = regionServerClass
2716           .getConstructor(Configuration.class);
2717       return c.newInstance(conf2);
2718     } catch (Exception e) {
2719       throw new RuntimeException("Failed construction of " + "Regionserver: "
2720           + regionServerClass.toString(), e);
2721     }
2722   }
2723 
2724   @Override
2725   public void replicateLogEntries(final HLog.Entry[] entries)
2726   throws IOException {
2727     checkOpen();
2728     if (this.replicationHandler == null) return;
2729     this.replicationHandler.replicateLogEntries(entries);
2730   }
2731 
2732 
2733   /**
2734    * @see org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine
2735    */
2736   public static void main(String[] args) throws Exception {
2737     Configuration conf = HBaseConfiguration.create();
2738     @SuppressWarnings("unchecked")
2739     Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2740         .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2741 
2742     new HRegionServerCommandLine(regionServerClass).doMain(args);
2743   }
2744 }