1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.regionserver;
21
22 import java.io.IOException;
23 import java.lang.Thread.UncaughtExceptionHandler;
24 import java.lang.annotation.Retention;
25 import java.lang.annotation.RetentionPolicy;
26 import java.lang.management.ManagementFactory;
27 import java.lang.management.MemoryUsage;
28 import java.lang.reflect.Constructor;
29 import java.lang.reflect.Method;
30 import java.net.BindException;
31 import java.net.InetSocketAddress;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Collections;
35 import java.util.Comparator;
36 import java.util.HashMap;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.HashSet;
39 import java.util.Iterator;
40 import java.util.LinkedList;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.Random;
44 import java.util.Set;
45 import java.util.SortedMap;
46 import java.util.TreeMap;
47 import java.util.concurrent.ConcurrentSkipListSet;
48 import java.util.concurrent.LinkedBlockingQueue;
49 import java.util.concurrent.TimeUnit;
50 import java.util.concurrent.atomic.AtomicBoolean;
51 import java.util.concurrent.atomic.AtomicInteger;
52 import java.util.concurrent.locks.ReentrantReadWriteLock;
53
54 import com.google.common.collect.Lists;
55 import org.apache.commons.logging.Log;
56 import org.apache.commons.logging.LogFactory;
57 import org.apache.hadoop.conf.Configuration;
58 import org.apache.hadoop.fs.FileSystem;
59 import org.apache.hadoop.fs.Path;
60 import org.apache.hadoop.hbase.Chore;
61 import org.apache.hadoop.hbase.ClockOutOfSyncException;
62 import org.apache.hadoop.hbase.DoNotRetryIOException;
63 import org.apache.hadoop.hbase.HBaseConfiguration;
64 import org.apache.hadoop.hbase.HConstants;
65 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
66 import org.apache.hadoop.hbase.HMsg;
67 import org.apache.hadoop.hbase.HRegionInfo;
68 import org.apache.hadoop.hbase.HServerAddress;
69 import org.apache.hadoop.hbase.HServerInfo;
70 import org.apache.hadoop.hbase.HServerLoad;
71 import org.apache.hadoop.hbase.KeyValue;
72 import org.apache.hadoop.hbase.MasterAddressTracker;
73 import org.apache.hadoop.hbase.NotServingRegionException;
74 import org.apache.hadoop.hbase.RemoteExceptionHandler;
75 import org.apache.hadoop.hbase.Server;
76 import org.apache.hadoop.hbase.Stoppable;
77 import org.apache.hadoop.hbase.UnknownRowLockException;
78 import org.apache.hadoop.hbase.UnknownScannerException;
79 import org.apache.hadoop.hbase.YouAreDeadException;
80 import org.apache.hadoop.hbase.catalog.CatalogTracker;
81 import org.apache.hadoop.hbase.catalog.MetaEditor;
82 import org.apache.hadoop.hbase.catalog.RootLocationEditor;
83 import org.apache.hadoop.hbase.client.Action;
84 import org.apache.hadoop.hbase.client.Delete;
85 import org.apache.hadoop.hbase.client.Get;
86 import org.apache.hadoop.hbase.client.HConnection;
87 import org.apache.hadoop.hbase.client.HConnectionManager;
88 import org.apache.hadoop.hbase.client.Increment;
89 import org.apache.hadoop.hbase.client.MultiAction;
90 import org.apache.hadoop.hbase.client.MultiPut;
91 import org.apache.hadoop.hbase.client.MultiPutResponse;
92 import org.apache.hadoop.hbase.client.MultiResponse;
93 import org.apache.hadoop.hbase.client.Put;
94 import org.apache.hadoop.hbase.client.Result;
95 import org.apache.hadoop.hbase.client.Row;
96 import org.apache.hadoop.hbase.client.Scan;
97 import org.apache.hadoop.hbase.executor.ExecutorService;
98 import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
99 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
100 import org.apache.hadoop.hbase.io.hfile.LruBlockCache.CacheStats;
101 import org.apache.hadoop.hbase.ipc.HBaseRPC;
102 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
103 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
104 import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
105 import org.apache.hadoop.hbase.ipc.HBaseServer;
106 import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
107 import org.apache.hadoop.hbase.ipc.HRegionInterface;
108 import org.apache.hadoop.hbase.ipc.ServerNotRunningException;
109 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
110 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
111 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
112 import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
113 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
114 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
115 import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
116 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
117 import org.apache.hadoop.hbase.regionserver.wal.HLog;
118 import org.apache.hadoop.hbase.regionserver.wal.WALObserver;
119 import org.apache.hadoop.hbase.replication.regionserver.Replication;
120 import org.apache.hadoop.hbase.security.User;
121 import org.apache.hadoop.hbase.util.Bytes;
122 import org.apache.hadoop.hbase.util.CompressionTest;
123 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
124 import org.apache.hadoop.hbase.util.FSUtils;
125 import org.apache.hadoop.hbase.util.InfoServer;
126 import org.apache.hadoop.hbase.util.Pair;
127 import org.apache.hadoop.hbase.util.Sleeper;
128 import org.apache.hadoop.hbase.util.Strings;
129 import org.apache.hadoop.hbase.util.Threads;
130 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
131 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
132 import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
133 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
134 import org.apache.hadoop.io.MapWritable;
135 import org.apache.hadoop.io.Writable;
136 import org.apache.hadoop.ipc.RemoteException;
137 import org.apache.hadoop.net.DNS;
138 import org.apache.zookeeper.KeeperException;
139
140 import com.google.common.base.Function;
141
142
143
144
145
146 public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
147 Runnable, RegionServerServices, Server {
148 public static final Log LOG = LogFactory.getLog(HRegionServer.class);
149
150
151
152
153 protected volatile boolean stopped = false;
154
155
156
157 private boolean stopping = false;
158
159
160
161 protected volatile boolean abortRequested;
162
163 private volatile boolean killed = false;
164
165
166 protected volatile boolean fsOk;
167
168 protected HServerInfo serverInfo;
169 protected final Configuration conf;
170
171 private final HConnection connection;
172 protected final AtomicBoolean haveRootRegion = new AtomicBoolean(false);
173 private FileSystem fs;
174 private Path rootDir;
175 private final Random rand = new Random();
176
177
178
179
180
181 protected final Map<String, HRegion> onlineRegions =
182 new ConcurrentHashMap<String, HRegion>();
183
184 protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
185 private final LinkedBlockingQueue<HMsg> outboundMsgs = new LinkedBlockingQueue<HMsg>();
186
187 final int numRetries;
188 protected final int threadWakeFrequency;
189 private final int msgInterval;
190
191 protected final int numRegionsToReport;
192
193 private final long maxScannerResultSize;
194
195
196 private HMasterRegionInterface hbaseMaster;
197
198
199
200 HBaseServer server;
201
202
203 private Leases leases;
204
205
206 private volatile AtomicInteger requestCount = new AtomicInteger();
207
208
209
210
211 InfoServer infoServer;
212
213
214 public static final String REGIONSERVER = "regionserver";
215
216
217
218
219
220
221 private final LinkedList<byte[]> reservedSpace = new LinkedList<byte[]>();
222
223 private RegionServerMetrics metrics;
224
225
226 CompactSplitThread compactSplitThread;
227
228
229 MemStoreFlusher cacheFlusher;
230
231
232
233
234 Chore majorCompactionChecker;
235
236
237
238 protected volatile HLog hlog;
239 LogRoller hlogRoller;
240
241
242 protected volatile boolean isOnline;
243
244 final Map<String, InternalScanner> scanners = new ConcurrentHashMap<String, InternalScanner>();
245
246
247 private ZooKeeperWatcher zooKeeper;
248
249
250 private MasterAddressTracker masterAddressManager;
251
252
253 private CatalogTracker catalogTracker;
254
255
256 private ClusterStatusTracker clusterStatusTracker;
257
258
259 private final Sleeper sleeper;
260
261 private final int rpcTimeout;
262
263
264 @SuppressWarnings("unused")
265 private Thread regionServerThread;
266
267
268 private ExecutorService service;
269
270
271 private Replication replicationHandler;
272
273 private final Set<byte[]> regionsInTransitionInRS =
274 new ConcurrentSkipListSet<byte[]>(Bytes.BYTES_COMPARATOR);
275
276
277
278
279
280
281
282
283 public HRegionServer(Configuration conf) throws IOException, InterruptedException {
284 this.fsOk = true;
285 this.conf = conf;
286 this.connection = HConnectionManager.getConnection(conf);
287 this.isOnline = false;
288
289
290 String [] codecs = conf.getStrings("hbase.regionserver.codecs",
291 (String[])null);
292 if (codecs != null) {
293 for (String codec : codecs) {
294 if (!CompressionTest.testCompression(codec)) {
295 throw new IOException("Compression codec " + codec +
296 " not supported, aborting RS construction");
297 }
298 }
299 }
300
301
302 this.numRetries = conf.getInt("hbase.client.retries.number", 10);
303 this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
304 10 * 1000);
305 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
306
307 sleeper = new Sleeper(this.msgInterval, this);
308
309 this.maxScannerResultSize = conf.getLong(
310 HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
311 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
312
313 this.numRegionsToReport = conf.getInt(
314 "hbase.regionserver.numregionstoreport", 10);
315
316 this.rpcTimeout = conf.getInt(
317 HConstants.HBASE_RPC_TIMEOUT_KEY,
318 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
319
320 this.abortRequested = false;
321 this.stopped = false;
322
323
324 String machineName = Strings.domainNamePointerToHostName(
325 DNS.getDefaultHost(conf.get("hbase.regionserver.dns.interface", "default")
326 , conf.get("hbase.regionserver.dns.nameserver", "default")));
327 String addressStr = machineName + ":" +
328 conf.get(HConstants.REGIONSERVER_PORT,
329 Integer.toString(HConstants.DEFAULT_REGIONSERVER_PORT));
330 HServerAddress address = new HServerAddress(addressStr);
331 this.server = HBaseRPC.getServer(this,
332 new Class<?>[]{HRegionInterface.class, HBaseRPCErrorHandler.class,
333 OnlineRegions.class},
334 address.getBindAddress(),
335 address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
336 conf.getInt("hbase.regionserver.metahandler.count", 10),
337 false, conf, QOS_THRESHOLD);
338 this.server.setErrorHandler(this);
339 this.server.setQosFunction(new QosFunction());
340
341
342 this.serverInfo = new HServerInfo(new HServerAddress(new InetSocketAddress(
343 address.getBindAddress(), this.server.getListenerAddress().getPort())),
344 System.currentTimeMillis(), this.conf.getInt(
345 "hbase.regionserver.info.port", 60030), machineName);
346 if (this.serverInfo.getServerAddress() == null) {
347 throw new NullPointerException("Server address cannot be null; "
348 + "hbase-958 debugging");
349 }
350
351
352 User.login(conf, "hbase.regionserver.keytab.file",
353 "hbase.regionserver.kerberos.principal", serverInfo.getHostname());
354 }
355
356 private static final int NORMAL_QOS = 0;
357 private static final int QOS_THRESHOLD = 10;
358 private static final int HIGH_QOS = 100;
359
360 @Retention(RetentionPolicy.RUNTIME)
361 private @interface QosPriority {
362 int priority() default 0;
363 }
364
365 class QosFunction implements Function<Writable,Integer> {
366 private final Map<String, Integer> annotatedQos;
367
368 public QosFunction() {
369 Map<String, Integer> qosMap = new HashMap<String, Integer>();
370 for (Method m : HRegionServer.class.getMethods()) {
371 QosPriority p = m.getAnnotation(QosPriority.class);
372 if (p != null) {
373 qosMap.put(m.getName(), p.priority());
374 }
375 }
376
377 annotatedQos = qosMap;
378 }
379
380 public boolean isMetaRegion(byte[] regionName) {
381 HRegion region;
382 try {
383 region = getRegion(regionName);
384 } catch (NotServingRegionException ignored) {
385 return false;
386 }
387 return region.getRegionInfo().isMetaRegion();
388 }
389
390 @Override
391 public Integer apply(Writable from) {
392 if (!(from instanceof HBaseRPC.Invocation)) return NORMAL_QOS;
393
394 HBaseRPC.Invocation inv = (HBaseRPC.Invocation) from;
395 String methodName = inv.getMethodName();
396
397 Integer priorityByAnnotation = annotatedQos.get(methodName);
398 if (priorityByAnnotation != null) {
399 return priorityByAnnotation;
400 }
401
402
403 if (methodName.equals("next") || methodName.equals("close")) {
404
405 Long scannerId;
406 try {
407 scannerId = (Long) inv.getParameters()[0];
408 } catch (ClassCastException ignored) {
409
410 return NORMAL_QOS;
411 }
412 String scannerIdString = Long.toString(scannerId);
413 InternalScanner scanner = scanners.get(scannerIdString);
414 if (scanner instanceof HRegion.RegionScanner) {
415 HRegion.RegionScanner rs = (HRegion.RegionScanner) scanner;
416 HRegionInfo regionName = rs.getRegionName();
417 if (regionName.isMetaRegion()) {
418
419 return HIGH_QOS;
420 }
421 }
422 } else if (inv.getParameterClasses().length == 0) {
423
424 } else if (inv.getParameterClasses()[0] == byte[].class) {
425
426 if (isMetaRegion((byte[]) inv.getParameters()[0])) {
427
428
429
430 return HIGH_QOS;
431 }
432 } else if (inv.getParameterClasses()[0] == MultiAction.class) {
433 MultiAction ma = (MultiAction) inv.getParameters()[0];
434 Set<byte[]> regions = ma.getRegions();
435
436
437
438
439
440
441
442 for (byte[] region : regions) {
443 if (isMetaRegion(region)) {
444
445
446 return HIGH_QOS;
447 }
448 }
449 }
450
451 return NORMAL_QOS;
452 }
453 }
454
455
456
457
458
459
460
461
462
463 private void initialize() throws IOException, InterruptedException {
464 try {
465 initializeZooKeeper();
466 initializeThreads();
467 int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4);
468 for (int i = 0; i < nbBlocks; i++) {
469 reservedSpace.add(new byte[HConstants.DEFAULT_SIZE_RESERVATION_BLOCK]);
470 }
471 } catch (Throwable t) {
472
473
474 LOG.error("Stopping HRS because failed initialize", t);
475 this.server.stop();
476 }
477 }
478
479
480
481
482
483
484
485
486
487 private void initializeZooKeeper() throws IOException, InterruptedException {
488
489 zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + ":" +
490 serverInfo.getServerAddress().getPort(), this);
491
492
493
494
495 this.masterAddressManager = new MasterAddressTracker(this.zooKeeper, this);
496 this.masterAddressManager.start();
497 blockAndCheckIfStopped(this.masterAddressManager);
498
499
500
501 this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this);
502 this.clusterStatusTracker.start();
503 blockAndCheckIfStopped(this.clusterStatusTracker);
504
505
506 this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
507 this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE));
508 catalogTracker.start();
509 }
510
511
512
513
514
515
516
517
518 private void blockAndCheckIfStopped(ZooKeeperNodeTracker tracker)
519 throws IOException, InterruptedException {
520 while (tracker.blockUntilAvailable(this.msgInterval) == null) {
521 if (this.stopped) {
522 throw new IOException("Received the shutdown message while waiting.");
523 }
524 }
525 }
526
527
528
529
530 private boolean isClusterUp() {
531 return this.clusterStatusTracker.isClusterUp();
532 }
533
534 private void initializeThreads() throws IOException {
535
536
537 this.cacheFlusher = new MemStoreFlusher(conf, this);
538
539
540 this.compactSplitThread = new CompactSplitThread(this);
541
542
543
544 int multiplier = this.conf.getInt(HConstants.THREAD_WAKE_FREQUENCY
545 + ".multiplier", 1000);
546 this.majorCompactionChecker = new MajorCompactionChecker(this,
547 this.threadWakeFrequency * multiplier, this);
548
549 this.leases = new Leases((int) conf.getLong(
550 HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
551 HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD),
552 this.threadWakeFrequency);
553 }
554
555
556
557
558
559
560 public void run() {
561
562 try {
563
564 initialize();
565 } catch (Exception e) {
566 abort("Fatal exception during initialization", e);
567 }
568
569 this.regionServerThread = Thread.currentThread();
570 try {
571 while (!this.stopped) {
572 if (tryReportForDuty()) break;
573 }
574 long lastMsg = 0;
575 List<HMsg> outboundMessages = new ArrayList<HMsg>();
576
577 for (int tries = 0; !this.stopped && isHealthy();) {
578 if (!isClusterUp()) {
579 if (isOnlineRegionsEmpty()) {
580 stop("Exiting; cluster shutdown set and not carrying any regions");
581 } else if (!this.stopping) {
582 this.stopping = true;
583 closeUserRegions(this.abortRequested);
584 } else if (this.stopping && LOG.isDebugEnabled()) {
585 LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
586 }
587 }
588 long now = System.currentTimeMillis();
589
590
591
592 if ((now - lastMsg) >= msgInterval || !outboundMessages.isEmpty()) {
593 try {
594 doMetrics();
595 tryRegionServerReport(outboundMessages);
596 lastMsg = System.currentTimeMillis();
597
598 tries = 0;
599 if (this.stopped) continue;
600 } catch (Exception e) {
601
602
603 if (e instanceof IOException) {
604 e = RemoteExceptionHandler.checkIOException((IOException) e);
605 }
606 if (e instanceof YouAreDeadException) {
607
608 throw e;
609 }
610 tries++;
611 if (tries > 0 && (tries % this.numRetries) == 0) {
612
613 checkFileSystem();
614 }
615 if (this.stopped) {
616 continue;
617 }
618 LOG.warn("Attempt=" + tries, e);
619
620
621 lastMsg = System.currentTimeMillis();
622 }
623 }
624 now = System.currentTimeMillis();
625 HMsg msg = this.outboundMsgs.poll((msgInterval - (now - lastMsg)), TimeUnit.MILLISECONDS);
626 if (msg != null) outboundMessages.add(msg);
627 }
628 } catch (Throwable t) {
629 if (!checkOOME(t)) {
630 abort("Unhandled exception: " + t.getMessage(), t);
631 }
632 }
633 this.leases.closeAfterLeasesExpire();
634 this.server.stop();
635 if (this.infoServer != null) {
636 LOG.info("Stopping infoServer");
637 try {
638 this.infoServer.stop();
639 } catch (Exception e) {
640 e.printStackTrace();
641 }
642 }
643
644 LruBlockCache c = (LruBlockCache) StoreFile.getBlockCache(this.conf);
645 if (c != null) {
646 c.shutdown();
647 }
648
649
650
651 if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary();
652 if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary();
653 if (this.hlogRoller != null) this.hlogRoller.interruptIfNecessary();
654 if (this.majorCompactionChecker != null) this.majorCompactionChecker.interrupt();
655
656 if (this.killed) {
657
658 } else if (abortRequested) {
659 if (this.fsOk) {
660 closeAllRegions(abortRequested);
661 closeWAL(false);
662 }
663 LOG.info("aborting server at: " + this.serverInfo.getServerName());
664 } else {
665 closeAllRegions(abortRequested);
666 closeWAL(true);
667 closeAllScanners();
668 LOG.info("stopping server at: " + this.serverInfo.getServerName());
669 }
670
671
672 if (this.catalogTracker != null) this.catalogTracker.stop();
673 if (this.fsOk) waitOnAllRegionsToClose();
674
675
676 if (this.hbaseMaster != null) {
677 HBaseRPC.stopProxy(this.hbaseMaster);
678 this.hbaseMaster = null;
679 }
680 this.leases.close();
681 HConnectionManager.deleteConnection(conf, true);
682 this.zooKeeper.close();
683 if (!killed) {
684 join();
685 }
686 LOG.info(Thread.currentThread().getName() + " exiting");
687 }
688
689 String getOnlineRegionsAsPrintableString() {
690 StringBuilder sb = new StringBuilder();
691 for (HRegion r: this.onlineRegions.values()) {
692 if (sb.length() > 0) sb.append(", ");
693 sb.append(r.getRegionInfo().getEncodedName());
694 }
695 return sb.toString();
696 }
697
698
699
700
701 private void waitOnAllRegionsToClose() {
702
703 int lastCount = -1;
704 while (!isOnlineRegionsEmpty()) {
705 int count = getNumberOfOnlineRegions();
706
707 if (count != lastCount) {
708 lastCount = count;
709 LOG.info("Waiting on " + count + " regions to close");
710
711
712 if (count < 10 && LOG.isDebugEnabled()) {
713 LOG.debug(this.onlineRegions);
714 }
715 }
716 Threads.sleep(1000);
717 }
718 }
719
720 List<HMsg> tryRegionServerReport(final List<HMsg> outboundMessages)
721 throws IOException {
722 this.serverInfo.setLoad(buildServerLoad());
723 this.requestCount.set(0);
724 addOutboundMsgs(outboundMessages);
725 HMsg [] msgs = null;
726 while (!this.stopped) {
727 try {
728 msgs = this.hbaseMaster.regionServerReport(this.serverInfo,
729 outboundMessages.toArray(HMsg.EMPTY_HMSG_ARRAY),
730 getMostLoadedRegions());
731 break;
732 } catch (IOException ioe) {
733 if (ioe instanceof RemoteException) {
734 ioe = ((RemoteException)ioe).unwrapRemoteException();
735 }
736 if (ioe instanceof YouAreDeadException) {
737
738 throw ioe;
739 }
740 LOG.warn("RemoteException connecting to master", ioe);
741
742
743 getMaster();
744 }
745 }
746 updateOutboundMsgs(outboundMessages);
747 outboundMessages.clear();
748
749 for (int i = 0; !this.stopped && msgs != null && i < msgs.length; i++) {
750 LOG.info(msgs[i].toString());
751
752 if (msgs[i].getType().equals(HMsg.Type.STOP_REGIONSERVER)) {
753 stop("Received " + msgs[i]);
754 continue;
755 }
756 LOG.warn("NOT PROCESSING " + msgs[i] + " -- WHY IS MASTER SENDING IT TO US?");
757 }
758 return outboundMessages;
759 }
760
761 private HServerLoad buildServerLoad() {
762 MemoryUsage memory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
763 HServerLoad hsl = new HServerLoad(requestCount.get(),
764 (int)(memory.getUsed() / 1024 / 1024),
765 (int) (memory.getMax() / 1024 / 1024));
766 for (HRegion r : this.onlineRegions.values()) {
767 hsl.addRegionInfo(createRegionLoad(r));
768 }
769 return hsl;
770 }
771
772 private void closeWAL(final boolean delete) {
773 try {
774 if (this.hlog != null) {
775 if (delete) {
776 hlog.closeAndDelete();
777 } else {
778 hlog.close();
779 }
780 }
781 } catch (Throwable e) {
782 LOG.error("Close and delete failed", RemoteExceptionHandler.checkThrowable(e));
783 }
784 }
785
786 private void closeAllScanners() {
787
788
789 for (Map.Entry<String, InternalScanner> e : this.scanners.entrySet()) {
790 try {
791 e.getValue().close();
792 } catch (IOException ioe) {
793 LOG.warn("Closing scanner " + e.getKey(), ioe);
794 }
795 }
796 }
797
798
799
800
801
802
803 private void addOutboundMsgs(final List<HMsg> msgs) {
804 if (msgs.isEmpty()) {
805 this.outboundMsgs.drainTo(msgs);
806 return;
807 }
808 OUTER: for (HMsg m : this.outboundMsgs) {
809 for (HMsg mm : msgs) {
810
811 if (mm.equals(m)) {
812 continue OUTER;
813 }
814 }
815 msgs.add(m);
816 }
817 }
818
819
820
821
822
823
824 private void updateOutboundMsgs(final List<HMsg> msgs) {
825 if (msgs.isEmpty()) {
826 return;
827 }
828 for (HMsg m : this.outboundMsgs) {
829 for (HMsg mm : msgs) {
830 if (mm.equals(m)) {
831 this.outboundMsgs.remove(m);
832 break;
833 }
834 }
835 }
836 }
837
838
839
840
841
842
843 protected void handleReportForDutyResponse(final MapWritable c) throws IOException {
844 try {
845 for (Map.Entry<Writable, Writable> e : c.entrySet()) {
846
847 String key = e.getKey().toString();
848
849 if (key.equals("hbase.regionserver.address")) {
850 HServerAddress hsa = (HServerAddress) e.getValue();
851 LOG.info("Master passed us address to use. Was="
852 + this.serverInfo.getServerAddress() + ", Now=" + hsa.toString());
853 this.serverInfo.setServerAddress(hsa);
854 continue;
855 }
856 String value = e.getValue().toString();
857 if (LOG.isDebugEnabled()) {
858 LOG.debug("Config from master: " + key + "=" + value);
859 }
860 this.conf.set(key, value);
861 }
862
863
864 if (this.conf.get("mapred.task.id") == null) {
865 this.conf.set("mapred.task.id",
866 "hb_rs_" + this.serverInfo.getServerName() + "_" +
867 System.currentTimeMillis());
868 }
869
870
871
872
873
874
875 this.conf.set("fs.defaultFS", this.conf.get("hbase.rootdir"));
876
877 this.fs = FileSystem.get(this.conf);
878 this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
879 this.hlog = setupWALAndReplication();
880
881 this.metrics = new RegionServerMetrics();
882 startServiceThreads();
883 LOG.info("Serving as " + this.serverInfo.getServerName() +
884 ", RPC listening on " + this.server.getListenerAddress() +
885 ", sessionid=0x" +
886 Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
887 isOnline = true;
888 } catch (Throwable e) {
889 this.isOnline = false;
890 stop("Failed initialization");
891 throw convertThrowableToIOE(cleanup(e, "Failed init"),
892 "Region server startup failed");
893 }
894 }
895
896
897
898
899
900
901
902
903 private HServerLoad.RegionLoad createRegionLoad(final HRegion r) {
904 byte[] name = r.getRegionName();
905 int stores = 0;
906 int storefiles = 0;
907 int storefileSizeMB = 0;
908 int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
909 int storefileIndexSizeMB = 0;
910 synchronized (r.stores) {
911 stores += r.stores.size();
912 for (Store store : r.stores.values()) {
913 storefiles += store.getStorefilesCount();
914 storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
915 storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
916 }
917 }
918 return new HServerLoad.RegionLoad(name, stores, storefiles,
919 storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB);
920 }
921
922
923
924
925
926
927 public HServerLoad.RegionLoad createRegionLoad(final String encodedRegionName) {
928 HRegion r = null;
929 r = this.onlineRegions.get(encodedRegionName);
930 return r != null ? createRegionLoad(r) : null;
931 }
932
933
934
935
936
937
938
939
940
941 private Throwable cleanup(final Throwable t) {
942 return cleanup(t, null);
943 }
944
945
946
947
948
949
950
951
952
953
954
955 private Throwable cleanup(final Throwable t, final String msg) {
956
957 if (t instanceof NotServingRegionException) {
958 LOG.debug("NotServingRegionException; " + t.getMessage());
959 return t;
960 }
961 if (msg == null) {
962 LOG.error("", RemoteExceptionHandler.checkThrowable(t));
963 } else {
964 LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
965 }
966 if (!checkOOME(t)) {
967 checkFileSystem();
968 }
969 return t;
970 }
971
972
973
974
975
976
977 private IOException convertThrowableToIOE(final Throwable t) {
978 return convertThrowableToIOE(t, null);
979 }
980
981
982
983
984
985
986
987
988 private IOException convertThrowableToIOE(final Throwable t, final String msg) {
989 return (t instanceof IOException ? (IOException) t : msg == null
990 || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
991 }
992
993
994
995
996
997
998
999
1000 public boolean checkOOME(final Throwable e) {
1001 boolean stop = false;
1002 if (e instanceof OutOfMemoryError
1003 || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
1004 || (e.getMessage() != null && e.getMessage().contains(
1005 "java.lang.OutOfMemoryError"))) {
1006 abort("OutOfMemoryError, aborting", e);
1007 stop = true;
1008 }
1009 return stop;
1010 }
1011
1012
1013
1014
1015
1016
1017
1018 protected boolean checkFileSystem() {
1019 if (this.fsOk && this.fs != null) {
1020 try {
1021 FSUtils.checkFileSystemAvailable(this.fs);
1022 } catch (IOException e) {
1023 abort("File System not available", e);
1024 this.fsOk = false;
1025 }
1026 }
1027 return this.fsOk;
1028 }
1029
1030
1031
1032
1033
1034 private static class MajorCompactionChecker extends Chore {
1035 private final HRegionServer instance;
1036 private int majorCompactPriority;
1037 private final static int DEFAULT_PRIORITY = Integer.MAX_VALUE;
1038
1039
1040 MajorCompactionChecker(final HRegionServer h, final int sleepTime,
1041 final Stoppable stopper) {
1042 super("MajorCompactionChecker", sleepTime, h);
1043 this.instance = h;
1044 LOG.info("Runs every " + sleepTime + "ms");
1045
1046
1047
1048
1049
1050 majorCompactPriority = this.instance.conf.getInt(
1051 "hbase.regionserver.compactionChecker.majorCompactPriority",
1052 DEFAULT_PRIORITY);
1053 }
1054
1055 @Override
1056 protected void chore() {
1057 for (HRegion r : this.instance.onlineRegions.values()) {
1058 try {
1059 if (r != null && r.isMajorCompaction()) {
1060
1061 if(majorCompactPriority == DEFAULT_PRIORITY ||
1062 majorCompactPriority > r.getCompactPriority()){
1063 this.instance.compactSplitThread.requestCompaction(r, getName()
1064 + " requests major compaction use default priority");
1065 } else {
1066 this.instance.compactSplitThread.requestCompaction(r, getName()
1067 + " requests major compaction use configured priority",
1068 this.majorCompactPriority);
1069 }
1070 }
1071 } catch (IOException e) {
1072 LOG.warn("Failed major compaction check on " + r, e);
1073 }
1074 }
1075 }
1076 }
1077
1078
1079
1080
1081
1082
1083
1084
1085 public boolean isOnline() {
1086 return isOnline;
1087 }
1088
1089
1090
1091
1092
1093
1094
1095 private HLog setupWALAndReplication() throws IOException {
1096 final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
1097 Path logdir = new Path(rootDir, HLog.getHLogDirectoryName(this.serverInfo));
1098 if (LOG.isDebugEnabled()) {
1099 LOG.debug("logdir=" + logdir);
1100 }
1101 if (this.fs.exists(logdir)) {
1102 throw new RegionServerRunningException("Region server already "
1103 + "running at " + this.serverInfo.getServerName()
1104 + " because logdir " + logdir.toString() + " exists");
1105 }
1106
1107
1108
1109 try {
1110 this.replicationHandler = Replication.isReplication(this.conf)?
1111 new Replication(this, this.fs, logdir, oldLogDir): null;
1112 } catch (KeeperException e) {
1113 throw new IOException("Failed replication handler create", e);
1114 }
1115 return instantiateHLog(logdir, oldLogDir);
1116 }
1117
1118
1119
1120
1121
1122
1123
1124
1125 protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
1126 return new HLog(this.fs, logdir, oldLogDir, this.conf,
1127 getWALActionListeners(), this.serverInfo.getServerAddress().toString());
1128 }
1129
1130
1131
1132
1133
1134
1135
1136 protected List<WALObserver> getWALActionListeners() {
1137 List<WALObserver> listeners = new ArrayList<WALObserver>();
1138
1139 this.hlogRoller = new LogRoller(this, this);
1140 listeners.add(this.hlogRoller);
1141 if (this.replicationHandler != null) {
1142
1143 listeners.add(this.replicationHandler);
1144 }
1145 return listeners;
1146 }
1147
1148 protected LogRoller getLogRoller() {
1149 return hlogRoller;
1150 }
1151
1152
1153
1154
1155 protected void doMetrics() {
1156 try {
1157 metrics();
1158 } catch (Throwable e) {
1159 LOG.warn("Failed metrics", e);
1160 }
1161 }
1162
1163 protected void metrics() {
1164 int seconds = this.msgInterval / 1000;
1165 if(0 == seconds){
1166 seconds = 1;
1167 }
1168 this.metrics.regions.set(this.onlineRegions.size());
1169 this.metrics.requests.set(this.requestCount.get()/seconds);
1170
1171
1172
1173 int stores = 0;
1174 int storefiles = 0;
1175 long memstoreSize = 0;
1176 long storefileIndexSize = 0;
1177 for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
1178 HRegion r = e.getValue();
1179 memstoreSize += r.memstoreSize.get();
1180 synchronized (r.stores) {
1181 stores += r.stores.size();
1182 for (Map.Entry<byte[], Store> ee : r.stores.entrySet()) {
1183 Store store = ee.getValue();
1184 storefiles += store.getStorefilesCount();
1185 storefileIndexSize += store.getStorefilesIndexSize();
1186 }
1187 }
1188 }
1189 this.metrics.stores.set(stores);
1190 this.metrics.storefiles.set(storefiles);
1191 this.metrics.memstoreSizeMB.set((int) (memstoreSize / (1024 * 1024)));
1192 this.metrics.storefileIndexSizeMB
1193 .set((int) (storefileIndexSize / (1024 * 1024)));
1194 this.metrics.compactionQueueSize.set(compactSplitThread
1195 .getCompactionQueueSize());
1196 this.metrics.flushQueueSize.set(cacheFlusher
1197 .getFlushQueueSize());
1198
1199 LruBlockCache lruBlockCache = (LruBlockCache) StoreFile.getBlockCache(conf);
1200 if (lruBlockCache != null) {
1201 this.metrics.blockCacheCount.set(lruBlockCache.size());
1202 this.metrics.blockCacheFree.set(lruBlockCache.getFreeSize());
1203 this.metrics.blockCacheSize.set(lruBlockCache.getCurrentSize());
1204 CacheStats cacheStats = lruBlockCache.getStats();
1205 this.metrics.blockCacheHitCount.set(cacheStats.getHitCount());
1206 this.metrics.blockCacheMissCount.set(cacheStats.getMissCount());
1207 this.metrics.blockCacheEvictedCount.set(lruBlockCache.getEvictedCount());
1208 double ratio = lruBlockCache.getStats().getHitRatio();
1209 int percent = (int) (ratio * 100);
1210 this.metrics.blockCacheHitRatio.set(percent);
1211 ratio = lruBlockCache.getStats().getHitCachingRatio();
1212 percent = (int) (ratio * 100);
1213 this.metrics.blockCacheHitCachingRatio.set(percent);
1214 }
1215 }
1216
1217
1218
1219
1220 public RegionServerMetrics getMetrics() {
1221 return this.metrics;
1222 }
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236 private void startServiceThreads() throws IOException {
1237 String n = Thread.currentThread().getName();
1238 UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
1239 public void uncaughtException(Thread t, Throwable e) {
1240 abort("Uncaught exception in service thread " + t.getName(), e);
1241 }
1242 };
1243
1244
1245 this.service = new ExecutorService(getServerName());
1246 this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
1247 conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
1248 this.service.startExecutorService(ExecutorType.RS_OPEN_ROOT,
1249 conf.getInt("hbase.regionserver.executor.openroot.threads", 1));
1250 this.service.startExecutorService(ExecutorType.RS_OPEN_META,
1251 conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
1252 this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
1253 conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
1254 this.service.startExecutorService(ExecutorType.RS_CLOSE_ROOT,
1255 conf.getInt("hbase.regionserver.executor.closeroot.threads", 1));
1256 this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
1257 conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
1258
1259 Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller", handler);
1260 Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
1261 handler);
1262 Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
1263 handler);
1264 Threads.setDaemonThreadRunning(this.majorCompactionChecker, n
1265 + ".majorCompactionChecker", handler);
1266
1267
1268
1269 this.leases.setName(n + ".leaseChecker");
1270 this.leases.start();
1271
1272 int port = this.conf.getInt("hbase.regionserver.info.port", 60030);
1273
1274 if (port >= 0) {
1275 String addr = this.conf.get("hbase.regionserver.info.bindAddress",
1276 "0.0.0.0");
1277
1278 boolean auto = this.conf.getBoolean("hbase.regionserver.info.port.auto",
1279 false);
1280 while (true) {
1281 try {
1282 this.infoServer = new InfoServer("regionserver", addr, port, false);
1283 this.infoServer.setAttribute("regionserver", this);
1284 this.infoServer.start();
1285 break;
1286 } catch (BindException e) {
1287 if (!auto) {
1288
1289 throw e;
1290 }
1291
1292 LOG.info("Failed binding http info server to port: " + port);
1293 port++;
1294
1295 this.serverInfo = new HServerInfo(this.serverInfo.getServerAddress(),
1296 this.serverInfo.getStartCode(), port,
1297 this.serverInfo.getHostname());
1298 }
1299 }
1300 }
1301
1302 if (this.replicationHandler != null) {
1303 this.replicationHandler.startReplicationServices();
1304 }
1305
1306
1307
1308 this.server.start();
1309 }
1310
1311
1312
1313
1314 private boolean isHealthy() {
1315 if (!fsOk) {
1316
1317 return false;
1318 }
1319
1320 if (!(leases.isAlive() && compactSplitThread.isAlive()
1321 && cacheFlusher.isAlive() && hlogRoller.isAlive()
1322 && this.majorCompactionChecker.isAlive())) {
1323 stop("One or more threads are no longer alive -- stop");
1324 return false;
1325 }
1326 return true;
1327 }
1328
1329 @Override
1330 public HLog getWAL() {
1331 return this.hlog;
1332 }
1333
1334 @Override
1335 public CatalogTracker getCatalogTracker() {
1336 return this.catalogTracker;
1337 }
1338
1339 @Override
1340 public void stop(final String msg) {
1341 this.stopped = true;
1342 LOG.info("STOPPED: " + msg);
1343 synchronized (this) {
1344
1345 notifyAll();
1346 }
1347 }
1348
1349 @Override
1350 public void postOpenDeployTasks(final HRegion r, final CatalogTracker ct,
1351 final boolean daughter)
1352 throws KeeperException, IOException {
1353
1354 if (r.hasReferences() || r.hasTooManyStoreFiles()) {
1355 getCompactionRequester().requestCompaction(r,
1356 r.hasReferences()? "Region has references on open" :
1357 "Region has too many store files");
1358 }
1359
1360
1361 addToOnlineRegions(r);
1362
1363
1364 if (r.getRegionInfo().isRootRegion()) {
1365 RootLocationEditor.setRootLocation(getZooKeeper(),
1366 getServerInfo().getServerAddress());
1367 } else if (r.getRegionInfo().isMetaRegion()) {
1368 MetaEditor.updateMetaLocation(ct, r.getRegionInfo(), getServerInfo());
1369 } else {
1370 if (daughter) {
1371
1372 MetaEditor.addDaughter(ct, r.getRegionInfo(), getServerInfo());
1373 } else {
1374 MetaEditor.updateRegionLocation(ct, r.getRegionInfo(), getServerInfo());
1375 }
1376 }
1377 }
1378
1379
1380
1381
1382
1383 public HBaseRpcMetrics getRpcMetrics() {
1384 return server.getRpcMetrics();
1385 }
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397 public void abort(String reason, Throwable cause) {
1398 if (cause != null) {
1399 LOG.fatal("ABORTING region server " + this + ": " + reason, cause);
1400 } else {
1401 LOG.fatal("ABORTING region server " + this + ": " + reason);
1402 }
1403 this.abortRequested = true;
1404 this.reservedSpace.clear();
1405 if (this.metrics != null) {
1406 LOG.info("Dump of metrics: " + this.metrics);
1407 }
1408 stop(reason);
1409 }
1410
1411
1412
1413
1414 public void abort(String reason) {
1415 abort(reason, null);
1416 }
1417
1418
1419
1420
1421
1422
1423 protected void kill() {
1424 this.killed = true;
1425 abort("Simulated kill");
1426 }
1427
1428
1429
1430
1431
1432 protected void join() {
1433 Threads.shutdown(this.majorCompactionChecker);
1434 Threads.shutdown(this.cacheFlusher);
1435 Threads.shutdown(this.compactSplitThread);
1436 Threads.shutdown(this.hlogRoller);
1437 this.service.shutdown();
1438 if (this.replicationHandler != null) {
1439 this.replicationHandler.join();
1440 }
1441 }
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451 private HServerAddress getMaster() {
1452 HServerAddress masterAddress = null;
1453 HMasterRegionInterface master = null;
1454
1455 while (!stopped && master == null) {
1456
1457 masterAddress = getMasterAddress();
1458 LOG.info("Attempting connect to Master server at " + masterAddress);
1459 try {
1460
1461
1462 master = (HMasterRegionInterface) HBaseRPC.waitForProxy(
1463 HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
1464 masterAddress.getInetSocketAddress(), this.conf, -1,
1465 this.rpcTimeout, this.rpcTimeout);
1466 } catch (IOException e) {
1467 e = e instanceof RemoteException ?
1468 ((RemoteException)e).unwrapRemoteException() : e;
1469 if (e instanceof ServerNotRunningException) {
1470 LOG.info("Master isn't available yet, retrying");
1471 } else {
1472 LOG.warn("Unable to connect to master. Retrying. Error was:", e);
1473 }
1474 sleeper.sleep();
1475 }
1476 }
1477 LOG.info("Connected to master at " + masterAddress);
1478 this.hbaseMaster = master;
1479 return masterAddress;
1480 }
1481
1482 private HServerAddress getMasterAddress() {
1483 HServerAddress masterAddress = null;
1484 while ((masterAddress = masterAddressManager.getMasterAddress()) == null) {
1485 if (stopped) {
1486 return null;
1487 }
1488 LOG.debug("No master found, will retry");
1489 sleeper.sleep();
1490 }
1491 return masterAddress;
1492 }
1493
1494
1495
1496
1497
1498 private boolean tryReportForDuty() throws IOException {
1499 MapWritable w = reportForDuty();
1500 if (w != null) {
1501 handleReportForDutyResponse(w);
1502 return true;
1503 }
1504 sleeper.sleep();
1505 LOG.warn("No response on reportForDuty. Sleeping and then retrying.");
1506 return false;
1507 }
1508
1509
1510
1511
1512
1513 private MapWritable reportForDuty() throws IOException {
1514 HServerAddress masterAddress = null;
1515 while (!stopped && (masterAddress = getMaster()) == null) {
1516 sleeper.sleep();
1517 LOG.warn("Unable to get master for initialization");
1518 }
1519
1520 MapWritable result = null;
1521 long lastMsg = 0;
1522
1523
1524
1525 boolean recheckMasterAddr = false;
1526 while (!stopped) {
1527 try {
1528 if (recheckMasterAddr) {
1529 masterAddress = getMaster();
1530 }
1531 this.requestCount.set(0);
1532 lastMsg = System.currentTimeMillis();
1533 ZKUtil.setAddressAndWatch(zooKeeper,
1534 ZKUtil.joinZNode(zooKeeper.rsZNode, ZKUtil.getNodeName(serverInfo)),
1535 this.serverInfo.getServerAddress());
1536 this.serverInfo.setLoad(buildServerLoad());
1537 LOG.info("Telling master at " + masterAddress + " that we are up");
1538 result = this.hbaseMaster.regionServerStartup(this.serverInfo,
1539 EnvironmentEdgeManager.currentTimeMillis());
1540 break;
1541 } catch (RemoteException e) {
1542 IOException ioe = e.unwrapRemoteException();
1543 if (ioe instanceof ClockOutOfSyncException) {
1544 LOG.fatal("Master rejected startup because clock is out of sync",
1545 ioe);
1546
1547 throw ioe;
1548 } else {
1549 recheckMasterAddr = true;
1550 LOG.warn("remote error telling master we are up", e);
1551 }
1552 } catch (IOException e) {
1553 recheckMasterAddr = true;
1554 LOG.warn("error telling master we are up", e);
1555 } catch (KeeperException e) {
1556 recheckMasterAddr = true;
1557 LOG.warn("error putting up ephemeral node in zookeeper", e);
1558 }
1559 sleeper.sleep(lastMsg);
1560 }
1561 return result;
1562 }
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574 void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA,
1575 HRegionInfo newRegionB) {
1576 this.outboundMsgs.add(new HMsg(
1577 HMsg.Type.REGION_SPLIT, oldRegion, newRegionA,
1578 newRegionB, Bytes.toBytes("Daughters; "
1579 + newRegionA.getRegionNameAsString() + ", "
1580 + newRegionB.getRegionNameAsString())));
1581 }
1582
1583
1584
1585
1586
1587
1588 protected void closeAllRegions(final boolean abort) {
1589 closeUserRegions(abort);
1590
1591 HRegion meta = null;
1592 HRegion root = null;
1593 this.lock.writeLock().lock();
1594 try {
1595 for (Map.Entry<String, HRegion> e: onlineRegions.entrySet()) {
1596 HRegionInfo hri = e.getValue().getRegionInfo();
1597 if (hri.isRootRegion()) {
1598 root = e.getValue();
1599 } else if (hri.isMetaRegion()) {
1600 meta = e.getValue();
1601 }
1602 if (meta != null && root != null) break;
1603 }
1604 } finally {
1605 this.lock.writeLock().unlock();
1606 }
1607 if (meta != null) closeRegion(meta.getRegionInfo(), abort, false);
1608 if (root != null) closeRegion(root.getRegionInfo(), abort, false);
1609 }
1610
1611
1612
1613
1614
1615 void closeUserRegions(final boolean abort) {
1616 this.lock.writeLock().lock();
1617 try {
1618 for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
1619 HRegion r = e.getValue();
1620 if (!r.getRegionInfo().isMetaRegion()) {
1621
1622 closeRegion(r.getRegionInfo(), abort, false);
1623 }
1624 }
1625 } finally {
1626 this.lock.writeLock().unlock();
1627 }
1628 }
1629
1630 @Override
1631 @QosPriority(priority=HIGH_QOS)
1632 public HRegionInfo getRegionInfo(final byte[] regionName)
1633 throws NotServingRegionException, IOException {
1634 checkOpen();
1635 requestCount.incrementAndGet();
1636 return getRegion(regionName).getRegionInfo();
1637 }
1638
1639 public Result getClosestRowBefore(final byte[] regionName, final byte[] row,
1640 final byte[] family) throws IOException {
1641 checkOpen();
1642 requestCount.incrementAndGet();
1643 try {
1644
1645 HRegion region = getRegion(regionName);
1646
1647
1648 Result r = region.getClosestRowBefore(row, family);
1649 return r;
1650 } catch (Throwable t) {
1651 throw convertThrowableToIOE(cleanup(t));
1652 }
1653 }
1654
1655
1656 public Result get(byte[] regionName, Get get) throws IOException {
1657 checkOpen();
1658 requestCount.incrementAndGet();
1659 try {
1660 HRegion region = getRegion(regionName);
1661 return region.get(get, getLockFromId(get.getLockId()));
1662 } catch (Throwable t) {
1663 throw convertThrowableToIOE(cleanup(t));
1664 }
1665 }
1666
1667 public boolean exists(byte[] regionName, Get get) throws IOException {
1668 checkOpen();
1669 requestCount.incrementAndGet();
1670 try {
1671 HRegion region = getRegion(regionName);
1672 Result r = region.get(get, getLockFromId(get.getLockId()));
1673 return r != null && !r.isEmpty();
1674 } catch (Throwable t) {
1675 throw convertThrowableToIOE(cleanup(t));
1676 }
1677 }
1678
1679 public void put(final byte[] regionName, final Put put) throws IOException {
1680 if (put.getRow() == null) {
1681 throw new IllegalArgumentException("update has null row");
1682 }
1683
1684 checkOpen();
1685 this.requestCount.incrementAndGet();
1686 HRegion region = getRegion(regionName);
1687 try {
1688 if (!region.getRegionInfo().isMetaTable()) {
1689 this.cacheFlusher.reclaimMemStoreMemory();
1690 }
1691 boolean writeToWAL = put.getWriteToWAL();
1692 region.put(put, getLockFromId(put.getLockId()), writeToWAL);
1693 } catch (Throwable t) {
1694 throw convertThrowableToIOE(cleanup(t));
1695 }
1696 }
1697
1698 public int put(final byte[] regionName, final List<Put> puts)
1699 throws IOException {
1700 checkOpen();
1701 HRegion region = null;
1702 try {
1703 region = getRegion(regionName);
1704 if (!region.getRegionInfo().isMetaTable()) {
1705 this.cacheFlusher.reclaimMemStoreMemory();
1706 }
1707
1708 @SuppressWarnings("unchecked")
1709 Pair<Put, Integer>[] putsWithLocks = new Pair[puts.size()];
1710
1711 int i = 0;
1712 for (Put p : puts) {
1713 Integer lock = getLockFromId(p.getLockId());
1714 putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
1715 }
1716
1717 this.requestCount.addAndGet(puts.size());
1718 OperationStatusCode[] codes = region.put(putsWithLocks);
1719 for (i = 0; i < codes.length; i++) {
1720 if (codes[i] != OperationStatusCode.SUCCESS) {
1721 return i;
1722 }
1723 }
1724 return -1;
1725 } catch (Throwable t) {
1726 throw convertThrowableToIOE(cleanup(t));
1727 }
1728 }
1729
1730 private boolean checkAndMutate(final byte[] regionName, final byte[] row,
1731 final byte[] family, final byte[] qualifier, final byte[] value,
1732 final Writable w, Integer lock) throws IOException {
1733 checkOpen();
1734 this.requestCount.incrementAndGet();
1735 HRegion region = getRegion(regionName);
1736 try {
1737 if (!region.getRegionInfo().isMetaTable()) {
1738 this.cacheFlusher.reclaimMemStoreMemory();
1739 }
1740 return region
1741 .checkAndMutate(row, family, qualifier, value, w, lock, true);
1742 } catch (Throwable t) {
1743 throw convertThrowableToIOE(cleanup(t));
1744 }
1745 }
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759 public boolean checkAndPut(final byte[] regionName, final byte[] row,
1760 final byte[] family, final byte[] qualifier, final byte[] value,
1761 final Put put) throws IOException {
1762 return checkAndMutate(regionName, row, family, qualifier, value, put,
1763 getLockFromId(put.getLockId()));
1764 }
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778 public boolean checkAndDelete(final byte[] regionName, final byte[] row,
1779 final byte[] family, final byte[] qualifier, final byte[] value,
1780 final Delete delete) throws IOException {
1781 return checkAndMutate(regionName, row, family, qualifier, value, delete,
1782 getLockFromId(delete.getLockId()));
1783 }
1784
1785
1786
1787
1788
1789 public long openScanner(byte[] regionName, Scan scan) throws IOException {
1790 checkOpen();
1791 NullPointerException npe = null;
1792 if (regionName == null) {
1793 npe = new NullPointerException("regionName is null");
1794 } else if (scan == null) {
1795 npe = new NullPointerException("scan is null");
1796 }
1797 if (npe != null) {
1798 throw new IOException("Invalid arguments to openScanner", npe);
1799 }
1800 requestCount.incrementAndGet();
1801 try {
1802 HRegion r = getRegion(regionName);
1803 return addScanner(r.getScanner(scan));
1804 } catch (Throwable t) {
1805 throw convertThrowableToIOE(cleanup(t, "Failed openScanner"));
1806 }
1807 }
1808
1809 protected long addScanner(InternalScanner s) throws LeaseStillHeldException {
1810 long scannerId = -1L;
1811 scannerId = rand.nextLong();
1812 String scannerName = String.valueOf(scannerId);
1813 scanners.put(scannerName, s);
1814 this.leases.createLease(scannerName, new ScannerListener(scannerName));
1815 return scannerId;
1816 }
1817
1818 public Result next(final long scannerId) throws IOException {
1819 Result[] res = next(scannerId, 1);
1820 if (res == null || res.length == 0) {
1821 return null;
1822 }
1823 return res[0];
1824 }
1825
1826 public Result[] next(final long scannerId, int nbRows) throws IOException {
1827 String scannerName = String.valueOf(scannerId);
1828 InternalScanner s = this.scanners.get(scannerName);
1829 if (s == null) throw new UnknownScannerException("Name: " + scannerName);
1830 try {
1831 checkOpen();
1832 } catch (IOException e) {
1833
1834
1835 try {
1836 this.leases.cancelLease(scannerName);
1837 } catch (LeaseException le) {
1838 LOG.info("Server shutting down and client tried to access missing scanner " +
1839 scannerName);
1840 }
1841 throw e;
1842 }
1843 Leases.Lease lease = null;
1844 try {
1845
1846
1847 lease = this.leases.removeLease(scannerName);
1848 List<Result> results = new ArrayList<Result>(nbRows);
1849 long currentScanResultSize = 0;
1850 List<KeyValue> values = new ArrayList<KeyValue>();
1851 for (int i = 0; i < nbRows
1852 && currentScanResultSize < maxScannerResultSize; i++) {
1853 requestCount.incrementAndGet();
1854
1855 boolean moreRows = s.next(values);
1856 if (!values.isEmpty()) {
1857 for (KeyValue kv : values) {
1858 currentScanResultSize += kv.heapSize();
1859 }
1860 results.add(new Result(values));
1861 }
1862 if (!moreRows) {
1863 break;
1864 }
1865 values.clear();
1866 }
1867
1868
1869
1870
1871
1872
1873 return ((HRegion.RegionScanner) s).isFilterDone() && results.isEmpty() ? null
1874 : results.toArray(new Result[0]);
1875 } catch (Throwable t) {
1876 if (t instanceof NotServingRegionException) {
1877 this.scanners.remove(scannerName);
1878 }
1879 throw convertThrowableToIOE(cleanup(t));
1880 } finally {
1881
1882
1883 if (this.scanners.containsKey(scannerName)) {
1884 if (lease != null) this.leases.addLease(lease);
1885 }
1886 }
1887 }
1888
1889 public void close(final long scannerId) throws IOException {
1890 try {
1891 checkOpen();
1892 requestCount.incrementAndGet();
1893 String scannerName = String.valueOf(scannerId);
1894 InternalScanner s = scanners.remove(scannerName);
1895 if (s != null) {
1896 s.close();
1897 this.leases.cancelLease(scannerName);
1898 }
1899 } catch (Throwable t) {
1900 throw convertThrowableToIOE(cleanup(t));
1901 }
1902 }
1903
1904
1905
1906
1907
1908 private class ScannerListener implements LeaseListener {
1909 private final String scannerName;
1910
1911 ScannerListener(final String n) {
1912 this.scannerName = n;
1913 }
1914
1915 public void leaseExpired() {
1916 LOG.info("Scanner " + this.scannerName + " lease expired");
1917 InternalScanner s = scanners.remove(this.scannerName);
1918 if (s != null) {
1919 try {
1920 s.close();
1921 } catch (IOException e) {
1922 LOG.error("Closing scanner", e);
1923 }
1924 }
1925 }
1926 }
1927
1928
1929
1930
1931 public void delete(final byte[] regionName, final Delete delete)
1932 throws IOException {
1933 checkOpen();
1934 try {
1935 boolean writeToWAL = true;
1936 this.requestCount.incrementAndGet();
1937 HRegion region = getRegion(regionName);
1938 if (!region.getRegionInfo().isMetaTable()) {
1939 this.cacheFlusher.reclaimMemStoreMemory();
1940 }
1941 Integer lid = getLockFromId(delete.getLockId());
1942 region.delete(delete, lid, writeToWAL);
1943 } catch (Throwable t) {
1944 throw convertThrowableToIOE(cleanup(t));
1945 }
1946 }
1947
1948 public int delete(final byte[] regionName, final List<Delete> deletes)
1949 throws IOException {
1950 checkOpen();
1951
1952 int i = 0;
1953 HRegion region = null;
1954 try {
1955 boolean writeToWAL = true;
1956 region = getRegion(regionName);
1957 if (!region.getRegionInfo().isMetaTable()) {
1958 this.cacheFlusher.reclaimMemStoreMemory();
1959 }
1960 int size = deletes.size();
1961 Integer[] locks = new Integer[size];
1962 for (Delete delete : deletes) {
1963 this.requestCount.incrementAndGet();
1964 locks[i] = getLockFromId(delete.getLockId());
1965 region.delete(delete, locks[i], writeToWAL);
1966 i++;
1967 }
1968 } catch (WrongRegionException ex) {
1969 LOG.debug("Batch deletes: " + i, ex);
1970 return i;
1971 } catch (NotServingRegionException ex) {
1972 return i;
1973 } catch (Throwable t) {
1974 throw convertThrowableToIOE(cleanup(t));
1975 }
1976 return -1;
1977 }
1978
1979 public long lockRow(byte[] regionName, byte[] row) throws IOException {
1980 checkOpen();
1981 NullPointerException npe = null;
1982 if (regionName == null) {
1983 npe = new NullPointerException("regionName is null");
1984 } else if (row == null) {
1985 npe = new NullPointerException("row to lock is null");
1986 }
1987 if (npe != null) {
1988 IOException io = new IOException("Invalid arguments to lockRow");
1989 io.initCause(npe);
1990 throw io;
1991 }
1992 requestCount.incrementAndGet();
1993 try {
1994 HRegion region = getRegion(regionName);
1995 Integer r = region.obtainRowLock(row);
1996 long lockId = addRowLock(r, region);
1997 LOG.debug("Row lock " + lockId + " explicitly acquired by client");
1998 return lockId;
1999 } catch (Throwable t) {
2000 throw convertThrowableToIOE(cleanup(t, "Error obtaining row lock (fsOk: "
2001 + this.fsOk + ")"));
2002 }
2003 }
2004
2005 protected long addRowLock(Integer r, HRegion region)
2006 throws LeaseStillHeldException {
2007 long lockId = -1L;
2008 lockId = rand.nextLong();
2009 String lockName = String.valueOf(lockId);
2010 rowlocks.put(lockName, r);
2011 this.leases.createLease(lockName, new RowLockListener(lockName, region));
2012 return lockId;
2013 }
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025 Integer getLockFromId(long lockId) throws IOException {
2026 if (lockId == -1L) {
2027 return null;
2028 }
2029 String lockName = String.valueOf(lockId);
2030 Integer rl = rowlocks.get(lockName);
2031 if (rl == null) {
2032 throw new UnknownRowLockException("Invalid row lock");
2033 }
2034 this.leases.renewLease(lockName);
2035 return rl;
2036 }
2037
2038 @Override
2039 @QosPriority(priority=HIGH_QOS)
2040 public void unlockRow(byte[] regionName, long lockId) throws IOException {
2041 checkOpen();
2042 NullPointerException npe = null;
2043 if (regionName == null) {
2044 npe = new NullPointerException("regionName is null");
2045 } else if (lockId == -1L) {
2046 npe = new NullPointerException("lockId is null");
2047 }
2048 if (npe != null) {
2049 IOException io = new IOException("Invalid arguments to unlockRow");
2050 io.initCause(npe);
2051 throw io;
2052 }
2053 requestCount.incrementAndGet();
2054 try {
2055 HRegion region = getRegion(regionName);
2056 String lockName = String.valueOf(lockId);
2057 Integer r = rowlocks.remove(lockName);
2058 if (r == null) {
2059 throw new UnknownRowLockException(lockName);
2060 }
2061 region.releaseRowLock(r);
2062 this.leases.cancelLease(lockName);
2063 LOG.debug("Row lock " + lockId
2064 + " has been explicitly released by client");
2065 } catch (Throwable t) {
2066 throw convertThrowableToIOE(cleanup(t));
2067 }
2068 }
2069
2070 @Override
2071 public void bulkLoadHFile(String hfilePath, byte[] regionName,
2072 byte[] familyName) throws IOException {
2073 checkOpen();
2074 HRegion region = getRegion(regionName);
2075 region.bulkLoadHFile(hfilePath, familyName);
2076 }
2077
2078 Map<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
2079
2080
2081
2082
2083
2084 private class RowLockListener implements LeaseListener {
2085 private final String lockName;
2086 private final HRegion region;
2087
2088 RowLockListener(final String lockName, final HRegion region) {
2089 this.lockName = lockName;
2090 this.region = region;
2091 }
2092
2093 public void leaseExpired() {
2094 LOG.info("Row Lock " + this.lockName + " lease expired");
2095 Integer r = rowlocks.remove(this.lockName);
2096 if (r != null) {
2097 region.releaseRowLock(r);
2098 }
2099 }
2100 }
2101
2102
2103
2104 @Override
2105 @QosPriority(priority=HIGH_QOS)
2106 public void openRegion(HRegionInfo region)
2107 throws IOException {
2108 if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) {
2109 throw new RegionAlreadyInTransitionException("open", region.getEncodedName());
2110 }
2111 LOG.info("Received request to open region: " +
2112 region.getRegionNameAsString());
2113 if (this.stopped) throw new RegionServerStoppedException();
2114 if (region.isRootRegion()) {
2115 this.service.submit(new OpenRootHandler(this, this, region));
2116 } else if(region.isMetaRegion()) {
2117 this.service.submit(new OpenMetaHandler(this, this, region));
2118 } else {
2119 this.service.submit(new OpenRegionHandler(this, this, region));
2120 }
2121 }
2122
2123 @Override
2124 @QosPriority(priority=HIGH_QOS)
2125 public void openRegions(List<HRegionInfo> regions)
2126 throws IOException {
2127 LOG.info("Received request to open " + regions.size() + " region(s)");
2128 for (HRegionInfo region: regions) openRegion(region);
2129 }
2130
2131 @Override
2132 @QosPriority(priority=HIGH_QOS)
2133 public boolean closeRegion(HRegionInfo region)
2134 throws IOException {
2135 return closeRegion(region, true);
2136 }
2137
2138 @Override
2139 @QosPriority(priority=HIGH_QOS)
2140 public boolean closeRegion(HRegionInfo region, final boolean zk)
2141 throws IOException {
2142 LOG.info("Received close region: " + region.getRegionNameAsString());
2143 boolean hasit = this.onlineRegions.containsKey(region.getEncodedName());
2144 if (!hasit) {
2145 LOG.warn("Received close for region we are not serving; " +
2146 region.getEncodedName());
2147 throw new NotServingRegionException("Received close for "
2148 + region.getRegionNameAsString() + " but we are not serving it");
2149 }
2150 if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) {
2151 throw new RegionAlreadyInTransitionException("close", region.getEncodedName());
2152 }
2153 return closeRegion(region, false, zk);
2154 }
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164 protected boolean closeRegion(HRegionInfo region, final boolean abort,
2165 final boolean zk) {
2166 if (this.regionsInTransitionInRS.contains(region.getEncodedNameAsBytes())) {
2167 LOG.warn("Received close for region we are already opening or closing; " +
2168 region.getEncodedName());
2169 return false;
2170 }
2171 CloseRegionHandler crh = null;
2172 if (region.isRootRegion()) {
2173 crh = new CloseRootHandler(this, this, region, abort, zk);
2174 } else if (region.isMetaRegion()) {
2175 crh = new CloseMetaHandler(this, this, region, abort, zk);
2176 } else {
2177 crh = new CloseRegionHandler(this, this, region, abort, zk);
2178 }
2179 this.service.submit(crh);
2180 return true;
2181 }
2182
2183
2184
2185 @Override
2186 @QosPriority(priority=HIGH_QOS)
2187 public void flushRegion(HRegionInfo regionInfo)
2188 throws NotServingRegionException, IOException {
2189 checkOpen();
2190 LOG.info("Flushing " + regionInfo.getRegionNameAsString());
2191 HRegion region = getRegion(regionInfo.getRegionName());
2192 region.flushcache();
2193 }
2194
2195 @Override
2196 @QosPriority(priority=HIGH_QOS)
2197 public void splitRegion(HRegionInfo regionInfo)
2198 throws NotServingRegionException, IOException {
2199 splitRegion(regionInfo, null);
2200 }
2201
2202 @Override
2203 public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
2204 throws NotServingRegionException, IOException {
2205 checkOpen();
2206 HRegion region = getRegion(regionInfo.getRegionName());
2207 region.flushcache();
2208 region.forceSplit(splitPoint);
2209
2210
2211
2212 compactSplitThread.requestCompaction(region, "User-triggered split",
2213 CompactSplitThread.PRIORITY_USER);
2214 }
2215
2216 @Override
2217 @QosPriority(priority=HIGH_QOS)
2218 public void compactRegion(HRegionInfo regionInfo, boolean major)
2219 throws NotServingRegionException, IOException {
2220 checkOpen();
2221 HRegion region = getRegion(regionInfo.getRegionName());
2222 compactSplitThread.requestCompaction(region, major, "User-triggered "
2223 + (major ? "major " : "") + "compaction",
2224 CompactSplitThread.PRIORITY_USER);
2225 }
2226
2227
2228 public InfoServer getInfoServer() {
2229 return infoServer;
2230 }
2231
2232
2233
2234
2235 public boolean isStopped() {
2236 return this.stopped;
2237 }
2238
2239 @Override
2240 public boolean isStopping() {
2241 return this.stopping;
2242 }
2243
2244
2245
2246
2247
2248 public Configuration getConfiguration() {
2249 return conf;
2250 }
2251
2252
2253 ReentrantReadWriteLock.WriteLock getWriteLock() {
2254 return lock.writeLock();
2255 }
2256
2257 @Override
2258 @QosPriority(priority=HIGH_QOS)
2259 public List<HRegionInfo> getOnlineRegions() {
2260 List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
2261 for (Map.Entry<String,HRegion> e: this.onlineRegions.entrySet()) {
2262 list.add(e.getValue().getRegionInfo());
2263 }
2264 Collections.sort(list);
2265 return list;
2266 }
2267
2268 public int getNumberOfOnlineRegions() {
2269 int size = -1;
2270 size = this.onlineRegions.size();
2271 return size;
2272 }
2273
2274 boolean isOnlineRegionsEmpty() {
2275 return this.onlineRegions.isEmpty();
2276 }
2277
2278
2279
2280
2281
2282
2283
2284 public Collection<HRegion> getOnlineRegionsLocalContext() {
2285 Collection<HRegion> regions = this.onlineRegions.values();
2286 return Collections.unmodifiableCollection(regions);
2287 }
2288
2289 @Override
2290 public void addToOnlineRegions(HRegion region) {
2291 this.onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
2292 }
2293
2294 @Override
2295 public boolean removeFromOnlineRegions(final String encodedName) {
2296 HRegion toReturn = null;
2297 toReturn = this.onlineRegions.remove(encodedName);
2298 return toReturn != null;
2299 }
2300
2301
2302
2303
2304
2305 public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
2306
2307 SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
2308 new Comparator<Long>() {
2309 public int compare(Long a, Long b) {
2310 return -1 * a.compareTo(b);
2311 }
2312 });
2313
2314 for (HRegion region : this.onlineRegions.values()) {
2315 sortedRegions.put(Long.valueOf(region.memstoreSize.get()), region);
2316 }
2317 return sortedRegions;
2318 }
2319
2320 @Override
2321 public HRegion getFromOnlineRegions(final String encodedRegionName) {
2322 HRegion r = null;
2323 r = this.onlineRegions.get(encodedRegionName);
2324 return r;
2325 }
2326
2327
2328
2329
2330
2331
2332 public HRegion getOnlineRegion(final byte[] regionName) {
2333 return getFromOnlineRegions(HRegionInfo.encodeRegionName(regionName));
2334 }
2335
2336
2337 public AtomicInteger getRequestCount() {
2338 return this.requestCount;
2339 }
2340
2341
2342 public FlushRequester getFlushRequester() {
2343 return this.cacheFlusher;
2344 }
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354 protected HRegion getRegion(final byte[] regionName)
2355 throws NotServingRegionException {
2356 HRegion region = null;
2357 region = getOnlineRegion(regionName);
2358 if (region == null) {
2359 throw new NotServingRegionException("Region is not online: " +
2360 Bytes.toStringBinary(regionName));
2361 }
2362 return region;
2363 }
2364
2365
2366
2367
2368
2369
2370
2371 protected HRegionInfo[] getMostLoadedRegions() {
2372 ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2373 for (HRegion r : onlineRegions.values()) {
2374 if (r.isClosed() || r.isClosing()) {
2375 continue;
2376 }
2377 if (regions.size() < numRegionsToReport) {
2378 regions.add(r.getRegionInfo());
2379 } else {
2380 break;
2381 }
2382 }
2383 return regions.toArray(new HRegionInfo[regions.size()]);
2384 }
2385
2386
2387
2388
2389
2390
2391 protected void checkOpen() throws IOException {
2392 if (this.stopped || this.abortRequested) {
2393 throw new IOException("Server not running"
2394 + (this.abortRequested ? ", aborting" : ""));
2395 }
2396 if (!fsOk) {
2397 throw new IOException("File system not available");
2398 }
2399 }
2400
2401
2402
2403
2404
2405 protected Set<HRegion> getRegionsToCheck() {
2406 HashSet<HRegion> regionsToCheck = new HashSet<HRegion>();
2407 regionsToCheck.addAll(this.onlineRegions.values());
2408
2409 for (final Iterator<HRegion> i = regionsToCheck.iterator(); i.hasNext();) {
2410 HRegion r = i.next();
2411 if (r.isClosed()) {
2412 i.remove();
2413 }
2414 }
2415 return regionsToCheck;
2416 }
2417
2418 @Override
2419 @QosPriority(priority=HIGH_QOS)
2420 public long getProtocolVersion(final String protocol, final long clientVersion)
2421 throws IOException {
2422 if (protocol.equals(HRegionInterface.class.getName())) {
2423 return HBaseRPCProtocolVersion.versionID;
2424 }
2425 throw new IOException("Unknown protocol to name node: " + protocol);
2426 }
2427
2428
2429
2430
2431 protected LinkedBlockingQueue<HMsg> getOutboundMsgs() {
2432 return this.outboundMsgs;
2433 }
2434
2435
2436
2437
2438
2439
2440 public long getGlobalMemStoreSize() {
2441 long total = 0;
2442 for (HRegion region : onlineRegions.values()) {
2443 total += region.memstoreSize.get();
2444 }
2445 return total;
2446 }
2447
2448
2449
2450
2451 protected Leases getLeases() {
2452 return leases;
2453 }
2454
2455
2456
2457
2458 protected Path getRootDir() {
2459 return rootDir;
2460 }
2461
2462
2463
2464
2465 protected FileSystem getFileSystem() {
2466 return fs;
2467 }
2468
2469
2470
2471
2472 public HServerInfo getServerInfo() {
2473 return this.serverInfo;
2474 }
2475
2476
2477 @Override
2478 public Result increment(byte[] regionName, Increment increment)
2479 throws IOException {
2480 checkOpen();
2481 if (regionName == null) {
2482 throw new IOException("Invalid arguments to increment " +
2483 "regionName is null");
2484 }
2485 requestCount.incrementAndGet();
2486 try {
2487 HRegion region = getRegion(regionName);
2488 return region.increment(increment, getLockFromId(increment.getLockId()),
2489 increment.getWriteToWAL());
2490 } catch (IOException e) {
2491 checkFileSystem();
2492 throw e;
2493 }
2494 }
2495
2496
2497 public long incrementColumnValue(byte[] regionName, byte[] row,
2498 byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
2499 throws IOException {
2500 checkOpen();
2501
2502 if (regionName == null) {
2503 throw new IOException("Invalid arguments to incrementColumnValue "
2504 + "regionName is null");
2505 }
2506 requestCount.incrementAndGet();
2507 try {
2508 HRegion region = getRegion(regionName);
2509 long retval = region.incrementColumnValue(row, family, qualifier, amount,
2510 writeToWAL);
2511
2512 return retval;
2513 } catch (IOException e) {
2514 checkFileSystem();
2515 throw e;
2516 }
2517 }
2518
2519
2520 @Override
2521 @QosPriority(priority=HIGH_QOS)
2522 public HServerInfo getHServerInfo() throws IOException {
2523 checkOpen();
2524 return serverInfo;
2525 }
2526
2527 @SuppressWarnings("unchecked")
2528 @Override
2529 public MultiResponse multi(MultiAction multi) throws IOException {
2530 checkOpen();
2531 MultiResponse response = new MultiResponse();
2532
2533 for (Map.Entry<byte[], List<Action>> e : multi.actions.entrySet()) {
2534 byte[] regionName = e.getKey();
2535 List<Action> actionsForRegion = e.getValue();
2536
2537
2538
2539 Collections.sort(actionsForRegion);
2540 Row action;
2541 List<Action> puts = new ArrayList<Action>();
2542 for (Action a : actionsForRegion) {
2543 action = a.getAction();
2544 int originalIndex = a.getOriginalIndex();
2545
2546 try {
2547 if (action instanceof Delete) {
2548 delete(regionName, (Delete) action);
2549 response.add(regionName, originalIndex, new Result());
2550 } else if (action instanceof Get) {
2551 response.add(regionName, originalIndex, get(regionName, (Get) action));
2552 } else if (action instanceof Put) {
2553 puts.add(a);
2554 } else {
2555 LOG.debug("Error: invalid Action, row must be a Get, Delete or Put.");
2556 throw new DoNotRetryIOException("Invalid Action, row must be a Get, Delete or Put.");
2557 }
2558 } catch (IOException ex) {
2559 response.add(regionName, originalIndex, ex);
2560 }
2561 }
2562
2563
2564
2565
2566 if (!puts.isEmpty()) {
2567 try {
2568 HRegion region = getRegion(regionName);
2569
2570 if (!region.getRegionInfo().isMetaTable()) {
2571 this.cacheFlusher.reclaimMemStoreMemory();
2572 }
2573
2574 List<Pair<Put,Integer>> putsWithLocks =
2575 Lists.newArrayListWithCapacity(puts.size());
2576 for (Action a : puts) {
2577 Put p = (Put) a.getAction();
2578
2579 Integer lock;
2580 try {
2581 lock = getLockFromId(p.getLockId());
2582 } catch (UnknownRowLockException ex) {
2583 response.add(regionName, a.getOriginalIndex(), ex);
2584 continue;
2585 }
2586 putsWithLocks.add(new Pair<Put, Integer>(p, lock));
2587 }
2588
2589 this.requestCount.addAndGet(puts.size());
2590
2591 OperationStatusCode[] codes =
2592 region.put(putsWithLocks.toArray(new Pair[]{}));
2593
2594 for( int i = 0 ; i < codes.length ; i++) {
2595 OperationStatusCode code = codes[i];
2596
2597 Action theAction = puts.get(i);
2598 Object result = null;
2599
2600 if (code == OperationStatusCode.SUCCESS) {
2601 result = new Result();
2602 } else if (code == OperationStatusCode.BAD_FAMILY) {
2603 result = new NoSuchColumnFamilyException();
2604 }
2605
2606
2607 response.add(regionName, theAction.getOriginalIndex(), result);
2608 }
2609 } catch (IOException ioe) {
2610
2611 for (Action a: puts) {
2612 response.add(regionName, a.getOriginalIndex(), ioe);
2613 }
2614 }
2615 }
2616 }
2617 return response;
2618 }
2619
2620
2621
2622
2623 @Override
2624 public MultiPutResponse multiPut(MultiPut puts) throws IOException {
2625 checkOpen();
2626 MultiPutResponse resp = new MultiPutResponse();
2627
2628
2629 for (Map.Entry<byte[], List<Put>> e : puts.puts.entrySet()) {
2630 int result = put(e.getKey(), e.getValue());
2631 resp.addResult(e.getKey(), result);
2632
2633 e.getValue().clear();
2634 }
2635
2636 return resp;
2637 }
2638
2639 public String toString() {
2640 return this.serverInfo.toString();
2641 }
2642
2643
2644
2645
2646
2647
2648 public int getThreadWakeFrequency() {
2649 return threadWakeFrequency;
2650 }
2651
2652 @Override
2653 public ZooKeeperWatcher getZooKeeper() {
2654 return zooKeeper;
2655 }
2656
2657 @Override
2658 public String getServerName() {
2659 return serverInfo.getServerName();
2660 }
2661
2662 @Override
2663 public CompactionRequestor getCompactionRequester() {
2664 return this.compactSplitThread;
2665 }
2666
2667 public Set<byte[]> getRegionsInTransitionInRS() {
2668 return this.regionsInTransitionInRS;
2669 }
2670
2671
2672
2673
2674
2675
2676
2677
2678
2679
2680 public static Thread startRegionServer(final HRegionServer hrs)
2681 throws IOException {
2682 return startRegionServer(hrs, "regionserver"
2683 + hrs.getServerInfo().getServerAddress().getPort());
2684 }
2685
2686
2687
2688
2689
2690
2691
2692 public static Thread startRegionServer(final HRegionServer hrs,
2693 final String name) throws IOException {
2694 Thread t = new Thread(hrs);
2695 t.setName(name);
2696 t.start();
2697
2698
2699 ShutdownHook.install(hrs.getConfiguration(), FileSystem.get(hrs
2700 .getConfiguration()), hrs, t);
2701 return t;
2702 }
2703
2704
2705
2706
2707
2708
2709
2710
2711 public static HRegionServer constructRegionServer(
2712 Class<? extends HRegionServer> regionServerClass,
2713 final Configuration conf2) {
2714 try {
2715 Constructor<? extends HRegionServer> c = regionServerClass
2716 .getConstructor(Configuration.class);
2717 return c.newInstance(conf2);
2718 } catch (Exception e) {
2719 throw new RuntimeException("Failed construction of " + "Regionserver: "
2720 + regionServerClass.toString(), e);
2721 }
2722 }
2723
2724 @Override
2725 public void replicateLogEntries(final HLog.Entry[] entries)
2726 throws IOException {
2727 checkOpen();
2728 if (this.replicationHandler == null) return;
2729 this.replicationHandler.replicateLogEntries(entries);
2730 }
2731
2732
2733
2734
2735
2736 public static void main(String[] args) throws Exception {
2737 Configuration conf = HBaseConfiguration.create();
2738 @SuppressWarnings("unchecked")
2739 Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
2740 .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
2741
2742 new HRegionServerCommandLine(regionServerClass).doMain(args);
2743 }
2744 }