1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.IOException;
23 import java.lang.reflect.UndeclaredThrowableException;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.LinkedHashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.NoSuchElementException;
30 import java.util.Map.Entry;
31 import java.util.Set;
32 import java.util.TreeSet;
33 import java.util.concurrent.Callable;
34 import java.util.concurrent.ConcurrentHashMap;
35 import java.util.concurrent.CopyOnWriteArraySet;
36 import java.util.concurrent.ExecutionException;
37 import java.util.concurrent.ExecutorService;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicInteger;
41
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.hadoop.conf.Configuration;
45 import org.apache.hadoop.hbase.DoNotRetryIOException;
46 import org.apache.hadoop.hbase.HConstants;
47 import org.apache.hadoop.hbase.HRegionInfo;
48 import org.apache.hadoop.hbase.HRegionLocation;
49 import org.apache.hadoop.hbase.HServerAddress;
50 import org.apache.hadoop.hbase.HTableDescriptor;
51 import org.apache.hadoop.hbase.KeyValue;
52 import org.apache.hadoop.hbase.MasterAddressTracker;
53 import org.apache.hadoop.hbase.MasterNotRunningException;
54 import org.apache.hadoop.hbase.RemoteExceptionHandler;
55 import org.apache.hadoop.hbase.TableNotFoundException;
56 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
57 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
58 import org.apache.hadoop.hbase.ipc.HBaseRPC;
59 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
60 import org.apache.hadoop.hbase.ipc.HMasterInterface;
61 import org.apache.hadoop.hbase.ipc.HRegionInterface;
62 import org.apache.hadoop.hbase.util.Bytes;
63 import org.apache.hadoop.hbase.util.Pair;
64 import org.apache.hadoop.hbase.util.SoftValueSortedMap;
65 import org.apache.hadoop.hbase.util.Writables;
66 import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
67 import org.apache.hadoop.hbase.zookeeper.ZKTable;
68 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
69 import org.apache.hadoop.ipc.RemoteException;
70 import org.apache.zookeeper.KeeperException;
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119 @SuppressWarnings("serial")
120 public class HConnectionManager {
121 static final int MAX_CACHED_HBASE_INSTANCES = 2001;
122
123
124
125
126 private static final Map<Configuration, HConnectionImplementation> HBASE_INSTANCES =
127 new LinkedHashMap<Configuration, HConnectionImplementation>
128 ((int) (MAX_CACHED_HBASE_INSTANCES/0.75F)+1, 0.75F, true) {
129 @Override
130 protected boolean removeEldestEntry(Map.Entry<Configuration, HConnectionImplementation> eldest) {
131 return size() > MAX_CACHED_HBASE_INSTANCES;
132 }
133 };
134
135
136
137
138 protected HConnectionManager() {
139 super();
140 }
141
142
143
144
145
146
147
148
149
150
151 public static HConnection getConnection(Configuration conf)
152 throws ZooKeeperConnectionException {
153 HConnectionImplementation connection;
154 synchronized (HBASE_INSTANCES) {
155 connection = HBASE_INSTANCES.get(conf);
156 if (connection == null) {
157 connection = new HConnectionImplementation(conf);
158 HBASE_INSTANCES.put(conf, connection);
159 }
160 }
161 return connection;
162 }
163
164
165
166
167
168
169
170
171
172
173 public static void deleteConnection(Configuration conf, boolean stopProxy) {
174 synchronized (HBASE_INSTANCES) {
175 HConnectionImplementation t = HBASE_INSTANCES.remove(conf);
176 if (t != null) {
177 t.close(stopProxy);
178 }
179 }
180 }
181
182
183
184
185
186
187 public static void deleteAllConnections(boolean stopProxy) {
188 synchronized (HBASE_INSTANCES) {
189 for (HConnectionImplementation t : HBASE_INSTANCES.values()) {
190 if (t != null) {
191 t.close(stopProxy);
192 }
193 }
194 }
195 }
196
197
198
199
200
201
202
203 static int getCachedRegionCount(Configuration conf,
204 byte[] tableName)
205 throws ZooKeeperConnectionException {
206 HConnectionImplementation connection = (HConnectionImplementation)getConnection(conf);
207 return connection.getNumberOfCachedRegionLocations(tableName);
208 }
209
210
211
212
213
214
215
216 static boolean isRegionCached(Configuration conf,
217 byte[] tableName, byte[] row) throws ZooKeeperConnectionException {
218 HConnectionImplementation connection = (HConnectionImplementation)getConnection(conf);
219 return connection.isRegionCached(tableName, row);
220 }
221
222
223 static class HConnectionImplementation implements HConnection {
224 static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
225 private final Class<? extends HRegionInterface> serverInterfaceClass;
226 private final long pause;
227 private final int numRetries;
228 private final int maxRPCAttempts;
229 private final int rpcTimeout;
230 private final int prefetchRegionLimit;
231
232 private final Object masterLock = new Object();
233 private volatile boolean closed;
234 private volatile HMasterInterface master;
235 private volatile boolean masterChecked;
236
237 private ZooKeeperWatcher zooKeeper;
238
239 private MasterAddressTracker masterAddressTracker;
240 private RootRegionTracker rootRegionTracker;
241
242 private final Object metaRegionLock = new Object();
243
244 private final Object userRegionLock = new Object();
245
246 private final Configuration conf;
247
248
249 private final Map<String, HRegionInterface> servers =
250 new ConcurrentHashMap<String, HRegionInterface>();
251 private final ConcurrentHashMap<String, String> connectionLock = new ConcurrentHashMap<String, String>();
252
253
254
255
256
257 private final Map<Integer, SoftValueSortedMap<byte [], HRegionLocation>>
258 cachedRegionLocations =
259 new HashMap<Integer, SoftValueSortedMap<byte [], HRegionLocation>>();
260
261
262
263 private final Set<Integer> regionCachePrefetchDisabledTables =
264 new CopyOnWriteArraySet<Integer>();
265
266
267
268
269
270 @SuppressWarnings("unchecked")
271 public HConnectionImplementation(Configuration conf)
272 throws ZooKeeperConnectionException {
273 this.conf = conf;
274 String serverClassName = conf.get(HConstants.REGION_SERVER_CLASS,
275 HConstants.DEFAULT_REGION_SERVER_CLASS);
276 this.closed = false;
277 try {
278 this.serverInterfaceClass =
279 (Class<? extends HRegionInterface>) Class.forName(serverClassName);
280 } catch (ClassNotFoundException e) {
281 throw new UnsupportedOperationException(
282 "Unable to find region server interface " + serverClassName, e);
283 }
284
285 this.pause = conf.getLong("hbase.client.pause", 1000);
286 this.numRetries = conf.getInt("hbase.client.retries.number", 10);
287 this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1);
288 this.rpcTimeout = conf.getInt(
289 HConstants.HBASE_RPC_TIMEOUT_KEY,
290 HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
291
292 this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit",
293 10);
294
295 setupZookeeperTrackers();
296
297 this.master = null;
298 this.masterChecked = false;
299 }
300
301 private synchronized void setupZookeeperTrackers()
302 throws ZooKeeperConnectionException{
303
304 this.zooKeeper = getZooKeeperWatcher();
305 masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this);
306 masterAddressTracker.start();
307
308 this.rootRegionTracker = new RootRegionTracker(this.zooKeeper, this);
309 this.rootRegionTracker.start();
310 }
311
312 private synchronized void resetZooKeeperTrackers()
313 throws ZooKeeperConnectionException {
314 LOG.info("Trying to reconnect to zookeeper");
315 masterAddressTracker.stop();
316 masterAddressTracker = null;
317 rootRegionTracker.stop();
318 rootRegionTracker = null;
319 this.zooKeeper = null;
320 setupZookeeperTrackers();
321 }
322
323 public Configuration getConfiguration() {
324 return this.conf;
325 }
326
327 private long getPauseTime(int tries) {
328 int ntries = tries;
329 if (ntries >= HConstants.RETRY_BACKOFF.length) {
330 ntries = HConstants.RETRY_BACKOFF.length - 1;
331 }
332 return this.pause * HConstants.RETRY_BACKOFF[ntries];
333 }
334
335 public HMasterInterface getMaster()
336 throws MasterNotRunningException, ZooKeeperConnectionException {
337
338
339 if (master != null) {
340 if (master.isMasterRunning()) {
341 return master;
342 }
343 }
344
345 HServerAddress masterLocation = null;
346 synchronized (this.masterLock) {
347 for (int tries = 0;
348 !this.closed &&
349 !this.masterChecked && this.master == null &&
350 tries < numRetries;
351 tries++) {
352
353 try {
354 masterLocation = masterAddressTracker.getMasterAddress();
355 if(masterLocation == null) {
356 LOG.info("ZooKeeper available but no active master location found");
357 throw new MasterNotRunningException();
358 }
359
360 HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy(
361 HMasterInterface.class, HBaseRPCProtocolVersion.versionID,
362 masterLocation.getInetSocketAddress(), this.conf, this.rpcTimeout);
363
364 if (tryMaster.isMasterRunning()) {
365 this.master = tryMaster;
366 this.masterLock.notifyAll();
367 break;
368 }
369
370 } catch (IOException e) {
371 if (tries == numRetries - 1) {
372
373 LOG.info("getMaster attempt " + tries + " of " + this.numRetries +
374 " failed; no more retrying.", e);
375 break;
376 }
377 LOG.info("getMaster attempt " + tries + " of " + this.numRetries +
378 " failed; retrying after sleep of " +
379 getPauseTime(tries), e);
380 }
381
382
383 try {
384 this.masterLock.wait(getPauseTime(tries));
385 } catch (InterruptedException e) {
386 Thread.currentThread().interrupt();
387 throw new RuntimeException("Thread was interrupted while trying to connect to master.");
388 }
389 }
390 this.masterChecked = true;
391 }
392 if (this.master == null) {
393 if (masterLocation == null) {
394 throw new MasterNotRunningException();
395 }
396 throw new MasterNotRunningException(masterLocation.toString());
397 }
398 return this.master;
399 }
400
401 public boolean isMasterRunning()
402 throws MasterNotRunningException, ZooKeeperConnectionException {
403 if (this.master == null) {
404 getMaster();
405 }
406 boolean isRunning = master.isMasterRunning();
407 if(isRunning) {
408 return true;
409 }
410 throw new MasterNotRunningException();
411 }
412
413 public HRegionLocation getRegionLocation(final byte [] name,
414 final byte [] row, boolean reload)
415 throws IOException {
416 return reload? relocateRegion(name, row): locateRegion(name, row);
417 }
418
419 public HTableDescriptor[] listTables() throws IOException {
420 final TreeSet<HTableDescriptor> uniqueTables =
421 new TreeSet<HTableDescriptor>();
422 MetaScannerVisitor visitor = new MetaScannerVisitor() {
423 public boolean processRow(Result result) throws IOException {
424 try {
425 byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
426 HConstants.REGIONINFO_QUALIFIER);
427 HRegionInfo info = null;
428 if (value != null) {
429 info = Writables.getHRegionInfo(value);
430 }
431
432 if (info != null && info.getStartKey().length == 0) {
433 uniqueTables.add(info.getTableDesc());
434 }
435 return true;
436 } catch (RuntimeException e) {
437 LOG.error("Result=" + result);
438 throw e;
439 }
440 }
441 };
442 MetaScanner.metaScan(conf, visitor);
443 return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]);
444 }
445
446 public boolean isTableEnabled(byte[] tableName) throws IOException {
447 return testTableOnlineState(tableName, true);
448 }
449
450 public boolean isTableDisabled(byte[] tableName) throws IOException {
451 return testTableOnlineState(tableName, false);
452 }
453
454 public boolean isTableAvailable(final byte[] tableName) throws IOException {
455 final AtomicBoolean available = new AtomicBoolean(true);
456 final AtomicInteger regionCount = new AtomicInteger(0);
457 MetaScannerVisitor visitor = new MetaScannerVisitor() {
458 @Override
459 public boolean processRow(Result row) throws IOException {
460 byte[] value = row.getValue(HConstants.CATALOG_FAMILY,
461 HConstants.REGIONINFO_QUALIFIER);
462 HRegionInfo info = Writables.getHRegionInfoOrNull(value);
463 if (info != null) {
464 if (Bytes.equals(tableName, info.getTableDesc().getName())) {
465 value = row.getValue(HConstants.CATALOG_FAMILY,
466 HConstants.SERVER_QUALIFIER);
467 if (value == null) {
468 available.set(false);
469 return false;
470 }
471 regionCount.incrementAndGet();
472 }
473 }
474 return true;
475 }
476 };
477 MetaScanner.metaScan(conf, visitor);
478 return available.get() && (regionCount.get() > 0);
479 }
480
481
482
483
484 private boolean testTableOnlineState(byte [] tableName, boolean online)
485 throws IOException {
486 if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
487
488 return online;
489 }
490 String tableNameStr = Bytes.toString(tableName);
491 try {
492 if (online) {
493 return ZKTable.isEnabledTable(this.zooKeeper, tableNameStr);
494 }
495 return ZKTable.isDisabledTable(this.zooKeeper, tableNameStr);
496 } catch (KeeperException e) {
497 throw new IOException("Enable/Disable failed", e);
498 }
499 }
500
501 private static class HTableDescriptorFinder
502 implements MetaScanner.MetaScannerVisitor {
503 byte[] tableName;
504 HTableDescriptor result;
505 protected HTableDescriptorFinder(byte[] tableName) {
506 this.tableName = tableName;
507 }
508 public boolean processRow(Result rowResult) throws IOException {
509 HRegionInfo info = Writables.getHRegionInfoOrNull(
510 rowResult.getValue(HConstants.CATALOG_FAMILY,
511 HConstants.REGIONINFO_QUALIFIER));
512 if (info == null) return true;
513 HTableDescriptor desc = info.getTableDesc();
514 if (Bytes.compareTo(desc.getName(), tableName) == 0) {
515 result = desc;
516 return false;
517 }
518 return true;
519 }
520 HTableDescriptor getResult() {
521 return result;
522 }
523 }
524
525 public HTableDescriptor getHTableDescriptor(final byte[] tableName)
526 throws IOException {
527 if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
528 return new UnmodifyableHTableDescriptor(HTableDescriptor.ROOT_TABLEDESC);
529 }
530 if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
531 return HTableDescriptor.META_TABLEDESC;
532 }
533 HTableDescriptorFinder finder = new HTableDescriptorFinder(tableName);
534 MetaScanner.metaScan(conf, finder, tableName);
535 HTableDescriptor result = finder.getResult();
536 if (result == null) {
537 throw new TableNotFoundException(Bytes.toString(tableName));
538 }
539 return result;
540 }
541
542 @Override
543 public HRegionLocation locateRegion(final byte [] regionName)
544 throws IOException {
545
546 return null;
547 }
548
549 @Override
550 public List<HRegionLocation> locateRegions(final byte [] tableName)
551 throws IOException {
552
553 return null;
554 }
555
556 public HRegionLocation locateRegion(final byte [] tableName,
557 final byte [] row)
558 throws IOException{
559 return locateRegion(tableName, row, true);
560 }
561
562 public HRegionLocation relocateRegion(final byte [] tableName,
563 final byte [] row)
564 throws IOException{
565 return locateRegion(tableName, row, false);
566 }
567
568 private HRegionLocation locateRegion(final byte [] tableName,
569 final byte [] row, boolean useCache)
570 throws IOException {
571 if (this.closed) throw new IOException(toString() + " closed");
572 if (tableName == null || tableName.length == 0) {
573 throw new IllegalArgumentException(
574 "table name cannot be null or zero length");
575 }
576
577 if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
578 try {
579 HServerAddress hsa =
580 this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout);
581 LOG.debug("Lookedup root region location, connection=" + this +
582 "; hsa=" + hsa);
583 if (hsa == null) return null;
584 return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, hsa);
585 } catch (InterruptedException e) {
586 Thread.currentThread().interrupt();
587 return null;
588 }
589 } else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
590 return locateRegionInMeta(HConstants.ROOT_TABLE_NAME, tableName, row,
591 useCache, metaRegionLock);
592 } else {
593
594 return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,
595 useCache, userRegionLock);
596 }
597 }
598
599
600
601
602
603
604 private void prefetchRegionCache(final byte[] tableName,
605 final byte[] row) {
606
607
608 MetaScannerVisitor visitor = new MetaScannerVisitor() {
609 public boolean processRow(Result result) throws IOException {
610 try {
611 byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
612 HConstants.REGIONINFO_QUALIFIER);
613 HRegionInfo regionInfo = null;
614
615 if (value != null) {
616
617 regionInfo = Writables.getHRegionInfo(value);
618
619
620 if (!Bytes.equals(regionInfo.getTableDesc().getName(),
621 tableName)) {
622 return false;
623 }
624 if (regionInfo.isOffline()) {
625
626 return true;
627 }
628 value = result.getValue(HConstants.CATALOG_FAMILY,
629 HConstants.SERVER_QUALIFIER);
630 if (value == null) {
631 return true;
632 }
633 final String serverAddress = Bytes.toString(value);
634
635
636 HRegionLocation loc = new HRegionLocation(regionInfo,
637 new HServerAddress(serverAddress));
638
639 cacheLocation(tableName, loc);
640 }
641 return true;
642 } catch (RuntimeException e) {
643 throw new IOException(e);
644 }
645 }
646 };
647 try {
648
649 MetaScanner.metaScan(conf, visitor, tableName, row,
650 this.prefetchRegionLimit);
651 } catch (IOException e) {
652 LOG.warn("Encountered problems when prefetch META table: ", e);
653 }
654 }
655
656
657
658
659
660 private HRegionLocation locateRegionInMeta(final byte [] parentTable,
661 final byte [] tableName, final byte [] row, boolean useCache,
662 Object regionLockObject)
663 throws IOException {
664 HRegionLocation location;
665
666
667 if (useCache) {
668 location = getCachedLocation(tableName, row);
669 if (location != null) {
670 return location;
671 }
672 }
673
674
675
676
677 byte [] metaKey = HRegionInfo.createRegionName(tableName, row,
678 HConstants.NINES, false);
679 for (int tries = 0; true; tries++) {
680 if (tries >= numRetries) {
681 throw new NoServerForRegionException("Unable to find region for "
682 + Bytes.toStringBinary(row) + " after " + numRetries + " tries.");
683 }
684
685 HRegionLocation metaLocation = null;
686 try {
687
688 metaLocation = locateRegion(parentTable, metaKey);
689
690 if (metaLocation == null) continue;
691 HRegionInterface server =
692 getHRegionConnection(metaLocation.getServerAddress());
693
694 Result regionInfoRow = null;
695
696
697
698 synchronized (regionLockObject) {
699
700
701 if (Bytes.equals(parentTable, HConstants.META_TABLE_NAME) &&
702 (getRegionCachePrefetch(tableName)) ) {
703 prefetchRegionCache(tableName, row);
704 }
705
706
707
708
709
710 if (useCache) {
711 location = getCachedLocation(tableName, row);
712 if (location != null) {
713 return location;
714 }
715 } else {
716 deleteCachedLocation(tableName, row);
717 }
718
719
720 regionInfoRow = server.getClosestRowBefore(
721 metaLocation.getRegionInfo().getRegionName(), metaKey,
722 HConstants.CATALOG_FAMILY);
723 }
724 if (regionInfoRow == null) {
725 throw new TableNotFoundException(Bytes.toString(tableName));
726 }
727 byte[] value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
728 HConstants.REGIONINFO_QUALIFIER);
729 if (value == null || value.length == 0) {
730 throw new IOException("HRegionInfo was null or empty in " +
731 Bytes.toString(parentTable) + ", row=" + regionInfoRow);
732 }
733
734 HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(
735 value, new HRegionInfo());
736
737 if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
738 throw new TableNotFoundException(
739 "Table '" + Bytes.toString(tableName) + "' was not found.");
740 }
741 if (regionInfo.isSplit()) {
742 throw new RegionOfflineException("the only available region for" +
743 " the required row is a split parent," +
744 " the daughters should be online soon: " +
745 regionInfo.getRegionNameAsString());
746 }
747 if (regionInfo.isOffline()) {
748 throw new RegionOfflineException("the region is offline, could" +
749 " be caused by a disable table call: " +
750 regionInfo.getRegionNameAsString());
751 }
752
753 value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
754 HConstants.SERVER_QUALIFIER);
755 String serverAddress = "";
756 if(value != null) {
757 serverAddress = Bytes.toString(value);
758 }
759 if (serverAddress.equals("")) {
760 throw new NoServerForRegionException("No server address listed " +
761 "in " + Bytes.toString(parentTable) + " for region " +
762 regionInfo.getRegionNameAsString());
763 }
764
765
766 location = new HRegionLocation(regionInfo,
767 new HServerAddress(serverAddress));
768 cacheLocation(tableName, location);
769 return location;
770 } catch (TableNotFoundException e) {
771
772
773
774 throw e;
775 } catch (IOException e) {
776 if (e instanceof RemoteException) {
777 e = RemoteExceptionHandler.decodeRemoteException(
778 (RemoteException) e);
779 }
780 if (tries < numRetries - 1) {
781 if (LOG.isDebugEnabled()) {
782 LOG.debug("locateRegionInMeta parentTable=" +
783 Bytes.toString(parentTable) + ", metaLocation=" +
784 ((metaLocation == null)? "null": metaLocation) + ", attempt=" +
785 tries + " of " +
786 this.numRetries + " failed; retrying after sleep of " +
787 getPauseTime(tries) + " because: " + e.getMessage());
788 }
789 } else {
790 throw e;
791 }
792
793 if(!(e instanceof RegionOfflineException ||
794 e instanceof NoServerForRegionException)) {
795 relocateRegion(parentTable, metaKey);
796 }
797 }
798 try{
799 Thread.sleep(getPauseTime(tries));
800 } catch (InterruptedException e) {
801 Thread.currentThread().interrupt();
802 throw new IOException("Giving up trying to location region in " +
803 "meta: thread is interrupted.");
804 }
805 }
806 }
807
808
809
810
811
812
813
814
815
816
817
818
819 HRegionLocation getCachedLocation(final byte [] tableName,
820 final byte [] row) {
821 SoftValueSortedMap<byte [], HRegionLocation> tableLocations =
822 getTableLocations(tableName);
823
824
825
826 if (tableLocations.isEmpty()) {
827 return null;
828 }
829
830 HRegionLocation rl = tableLocations.get(row);
831 if (rl != null) {
832 return rl;
833 }
834
835
836
837 SoftValueSortedMap<byte[], HRegionLocation> matchingRegions =
838 tableLocations.headMap(row);
839
840
841
842
843 if (!matchingRegions.isEmpty()) {
844 HRegionLocation possibleRegion = null;
845 try {
846 possibleRegion = matchingRegions.get(matchingRegions.lastKey());
847 } catch (NoSuchElementException nsee) {
848 LOG.warn("checkReferences() might have removed the key", nsee);
849 }
850
851
852
853 if (possibleRegion != null) {
854 byte[] endKey = possibleRegion.getRegionInfo().getEndKey();
855
856
857
858
859
860
861 if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
862 KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length,
863 row, 0, row.length) > 0) {
864 return possibleRegion;
865 }
866 }
867 }
868
869
870 return null;
871 }
872
873
874
875
876
877
878 void deleteCachedLocation(final byte [] tableName, final byte [] row) {
879 synchronized (this.cachedRegionLocations) {
880 SoftValueSortedMap<byte [], HRegionLocation> tableLocations =
881 getTableLocations(tableName);
882
883
884 if (!tableLocations.isEmpty()) {
885 HRegionLocation rl = getCachedLocation(tableName, row);
886 if (rl != null) {
887 tableLocations.remove(rl.getRegionInfo().getStartKey());
888 if (LOG.isDebugEnabled()) {
889 LOG.debug("Removed " +
890 rl.getRegionInfo().getRegionNameAsString() +
891 " for tableName=" + Bytes.toString(tableName) +
892 " from cache " + "because of " + Bytes.toStringBinary(row));
893 }
894 }
895 }
896 }
897 }
898
899
900
901
902
903 private SoftValueSortedMap<byte [], HRegionLocation> getTableLocations(
904 final byte [] tableName) {
905
906 Integer key = Bytes.mapKey(tableName);
907 SoftValueSortedMap<byte [], HRegionLocation> result;
908 synchronized (this.cachedRegionLocations) {
909 result = this.cachedRegionLocations.get(key);
910
911 if (result == null) {
912 result = new SoftValueSortedMap<byte [], HRegionLocation>(
913 Bytes.BYTES_COMPARATOR);
914 this.cachedRegionLocations.put(key, result);
915 }
916 }
917 return result;
918 }
919
920 @Override
921 public void clearRegionCache() {
922 synchronized(this.cachedRegionLocations) {
923 this.cachedRegionLocations.clear();
924 }
925 }
926
927 @Override
928 public void clearRegionCache(final byte [] tableName) {
929 synchronized (this.cachedRegionLocations) {
930 this.cachedRegionLocations.remove(Bytes.mapKey(tableName));
931 }
932 }
933
934
935
936
937 private void cacheLocation(final byte [] tableName,
938 final HRegionLocation location) {
939 byte [] startKey = location.getRegionInfo().getStartKey();
940 SoftValueSortedMap<byte [], HRegionLocation> tableLocations =
941 getTableLocations(tableName);
942 if (tableLocations.put(startKey, location) == null) {
943 LOG.debug("Cached location for " +
944 location.getRegionInfo().getRegionNameAsString() +
945 " is " + location.getServerAddress());
946 }
947 }
948
949 public HRegionInterface getHRegionConnection(
950 HServerAddress regionServer, boolean getMaster)
951 throws IOException {
952 if (getMaster) {
953 getMaster();
954 }
955 HRegionInterface server;
956 String rsName = regionServer.toString();
957
958 server = this.servers.get(rsName);
959 if (server == null) {
960
961 this.connectionLock.putIfAbsent(rsName, rsName);
962
963 synchronized (this.connectionLock.get(rsName)) {
964
965 server = this.servers.get(rsName);
966 if (server == null) {
967 try {
968
969 server = (HRegionInterface) HBaseRPC.waitForProxy(
970 serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
971 regionServer.getInetSocketAddress(), this.conf,
972 this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
973 this.servers.put(rsName, server);
974 } catch (RemoteException e) {
975 LOG.warn("RemoteException connecting to RS", e);
976
977 throw RemoteExceptionHandler.decodeRemoteException(e);
978 }
979 }
980 }
981 }
982 return server;
983 }
984
985 public HRegionInterface getHRegionConnection(
986 HServerAddress regionServer)
987 throws IOException {
988 return getHRegionConnection(regionServer, false);
989 }
990
991
992
993
994
995
996
997
998 public synchronized ZooKeeperWatcher getZooKeeperWatcher()
999 throws ZooKeeperConnectionException {
1000 if(zooKeeper == null) {
1001 try {
1002 this.zooKeeper = new ZooKeeperWatcher(conf, "hconnection", this);
1003 } catch(ZooKeeperConnectionException zce) {
1004 throw zce;
1005 } catch (IOException e) {
1006 throw new ZooKeeperConnectionException("An error is preventing" +
1007 " HBase from connecting to ZooKeeper", e);
1008 }
1009 }
1010 return zooKeeper;
1011 }
1012
1013 public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
1014 throws IOException, RuntimeException {
1015 List<Throwable> exceptions = new ArrayList<Throwable>();
1016 for(int tries = 0; tries < numRetries; tries++) {
1017 try {
1018 callable.instantiateServer(tries != 0);
1019 return callable.call();
1020 } catch (Throwable t) {
1021 t = translateException(t);
1022 exceptions.add(t);
1023 if (tries == numRetries - 1) {
1024 throw new RetriesExhaustedException(callable.getServerName(),
1025 callable.getRegionName(), callable.getRow(), tries, exceptions);
1026 }
1027 }
1028 try {
1029 Thread.sleep(getPauseTime(tries));
1030 } catch (InterruptedException e) {
1031 Thread.currentThread().interrupt();
1032 throw new IOException("Giving up trying to get region server: thread is interrupted.");
1033 }
1034 }
1035 return null;
1036 }
1037
1038 public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
1039 throws IOException, RuntimeException {
1040 try {
1041 callable.instantiateServer(false);
1042 return callable.call();
1043 } catch (Throwable t) {
1044 Throwable t2 = translateException(t);
1045 if (t2 instanceof IOException) {
1046 throw (IOException)t2;
1047 } else {
1048 throw new RuntimeException(t2);
1049 }
1050 }
1051 }
1052
1053 void close(boolean stopProxy) {
1054 if (master != null) {
1055 if (stopProxy) {
1056 HBaseRPC.stopProxy(master);
1057 }
1058 master = null;
1059 masterChecked = false;
1060 }
1061 if (stopProxy) {
1062 for (HRegionInterface i: servers.values()) {
1063 HBaseRPC.stopProxy(i);
1064 }
1065 }
1066 if (this.zooKeeper != null) {
1067 LOG.info("Closed zookeeper sessionid=0x" +
1068 Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
1069 this.zooKeeper.close();
1070 this.zooKeeper = null;
1071 }
1072 this.closed = true;
1073 }
1074
1075 private Callable<MultiResponse> createCallable(
1076 final HServerAddress address,
1077 final MultiAction multi,
1078 final byte [] tableName) {
1079 final HConnection connection = this;
1080 return new Callable<MultiResponse>() {
1081 public MultiResponse call() throws IOException {
1082 return getRegionServerWithoutRetries(
1083 new ServerCallable<MultiResponse>(connection, tableName, null) {
1084 public MultiResponse call() throws IOException {
1085 return server.multi(multi);
1086 }
1087 @Override
1088 public void instantiateServer(boolean reload) throws IOException {
1089 server = connection.getHRegionConnection(address);
1090 }
1091 }
1092 );
1093 }
1094 };
1095 }
1096
1097 public void processBatch(List<Row> list,
1098 final byte[] tableName,
1099 ExecutorService pool,
1100 Object[] results) throws IOException, InterruptedException {
1101
1102
1103 if (results.length != list.size()) {
1104 throw new IllegalArgumentException("argument results must be the same size as argument list");
1105 }
1106
1107 if (list.size() == 0) {
1108 return;
1109 }
1110
1111
1112
1113 HServerAddress [] lastServers = new HServerAddress[results.length];
1114 List<Row> workingList = new ArrayList<Row>(list);
1115 boolean retry = true;
1116 Throwable singleRowCause = null;
1117
1118 for (int tries = 0; tries < numRetries && retry; ++tries) {
1119
1120
1121 if (tries >= 1) {
1122 long sleepTime = getPauseTime(tries);
1123 LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
1124 Thread.sleep(sleepTime);
1125 }
1126
1127
1128
1129 Map<HServerAddress, MultiAction> actionsByServer = new HashMap<HServerAddress, MultiAction>();
1130 for (int i = 0; i < workingList.size(); i++) {
1131 Row row = workingList.get(i);
1132 if (row != null) {
1133 HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
1134 HServerAddress address = loc.getServerAddress();
1135 byte[] regionName = loc.getRegionInfo().getRegionName();
1136
1137 MultiAction actions = actionsByServer.get(address);
1138 if (actions == null) {
1139 actions = new MultiAction();
1140 actionsByServer.put(address, actions);
1141 }
1142
1143 Action action = new Action(regionName, row, i);
1144 lastServers[i] = address;
1145 actions.add(regionName, action);
1146 }
1147 }
1148
1149
1150
1151 Map<HServerAddress,Future<MultiResponse>> futures =
1152 new HashMap<HServerAddress, Future<MultiResponse>>(actionsByServer.size());
1153
1154 for (Entry<HServerAddress, MultiAction> e : actionsByServer.entrySet()) {
1155 futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
1156 }
1157
1158
1159
1160 for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures.entrySet()) {
1161 HServerAddress address = responsePerServer.getKey();
1162
1163 try {
1164 Future<MultiResponse> future = responsePerServer.getValue();
1165 MultiResponse resp = future.get();
1166
1167 if (resp == null) {
1168
1169 LOG.debug("Failed all for server: " + address + ", removing from cache");
1170 continue;
1171 }
1172
1173 for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet()) {
1174 byte[] regionName = e.getKey();
1175 List<Pair<Integer, Object>> regionResults = e.getValue();
1176 for (Pair<Integer, Object> regionResult : regionResults) {
1177 if (regionResult == null) {
1178
1179 LOG.debug("Failures for region: " +
1180 Bytes.toStringBinary(regionName) +
1181 ", removing from cache");
1182 } else {
1183
1184 results[regionResult.getFirst()] = regionResult.getSecond();
1185 }
1186 }
1187 }
1188 } catch (ExecutionException e) {
1189 LOG.debug("Failed all from " + address, e);
1190 }
1191 }
1192
1193
1194
1195
1196
1197 retry = false;
1198 workingList.clear();
1199 for (int i = 0; i < results.length; i++) {
1200
1201
1202 if (results[i] == null ||
1203 (results[i] instanceof Throwable &&
1204 !(results[i] instanceof DoNotRetryIOException))) {
1205
1206 retry = true;
1207
1208 Row row = list.get(i);
1209 workingList.add(row);
1210 deleteCachedLocation(tableName, row.getRow());
1211 } else {
1212
1213 workingList.add(null);
1214 }
1215 }
1216 }
1217
1218 if (retry) {
1219
1220 if (singleRowCause != null) {
1221 throw new IOException(singleRowCause);
1222 }
1223 }
1224
1225
1226 List<Throwable> exceptions = new ArrayList<Throwable>();
1227 List<Row> actions = new ArrayList<Row>();
1228 List<HServerAddress> addresses = new ArrayList<HServerAddress>();
1229
1230 for (int i = 0 ; i < results.length; i++) {
1231 if (results[i] == null || results[i] instanceof Throwable) {
1232 exceptions.add((Throwable)results[i]);
1233 actions.add(list.get(i));
1234 addresses.add(lastServers[i]);
1235 }
1236 }
1237
1238 if (!exceptions.isEmpty()) {
1239 throw new RetriesExhaustedWithDetailsException(exceptions,
1240 actions,
1241 addresses);
1242 }
1243 }
1244
1245
1246
1247
1248 public void processBatchOfPuts(List<Put> list,
1249 final byte[] tableName,
1250 ExecutorService pool) throws IOException {
1251 Object[] results = new Object[list.size()];
1252 try {
1253 processBatch((List) list, tableName, pool, results);
1254 } catch (InterruptedException e) {
1255 throw new IOException(e);
1256 } finally {
1257
1258
1259
1260
1261 for (int i = results.length - 1; i>=0; i--) {
1262 if (results[i] instanceof Result) {
1263
1264 list.remove(i);
1265 }
1266 }
1267 }
1268 }
1269
1270 private Throwable translateException(Throwable t) throws IOException {
1271 if (t instanceof UndeclaredThrowableException) {
1272 t = t.getCause();
1273 }
1274 if (t instanceof RemoteException) {
1275 t = RemoteExceptionHandler.decodeRemoteException((RemoteException)t);
1276 }
1277 if (t instanceof DoNotRetryIOException) {
1278 throw (DoNotRetryIOException)t;
1279 }
1280 return t;
1281 }
1282
1283
1284
1285
1286
1287 int getNumberOfCachedRegionLocations(final byte[] tableName) {
1288 Integer key = Bytes.mapKey(tableName);
1289 synchronized (this.cachedRegionLocations) {
1290 SoftValueSortedMap<byte[], HRegionLocation> tableLocs =
1291 this.cachedRegionLocations.get(key);
1292
1293 if (tableLocs == null) {
1294 return 0;
1295 }
1296 return tableLocs.values().size();
1297 }
1298 }
1299
1300
1301
1302
1303
1304
1305
1306
1307 boolean isRegionCached(final byte[] tableName, final byte[] row) {
1308 HRegionLocation location = getCachedLocation(tableName, row);
1309 return location != null;
1310 }
1311
1312 public void setRegionCachePrefetch(final byte[] tableName,
1313 final boolean enable) {
1314 if (!enable) {
1315 regionCachePrefetchDisabledTables.add(Bytes.mapKey(tableName));
1316 }
1317 else {
1318 regionCachePrefetchDisabledTables.remove(Bytes.mapKey(tableName));
1319 }
1320 }
1321
1322 public boolean getRegionCachePrefetch(final byte[] tableName) {
1323 return !regionCachePrefetchDisabledTables.contains(Bytes.mapKey(tableName));
1324 }
1325
1326 public void prewarmRegionCache(final byte[] tableName,
1327 final Map<HRegionInfo, HServerAddress> regions) {
1328 for (Map.Entry<HRegionInfo, HServerAddress> e : regions.entrySet()) {
1329 cacheLocation(tableName,
1330 new HRegionLocation(e.getKey(), e.getValue()));
1331 }
1332 }
1333
1334 @Override
1335 public void abort(final String msg, Throwable t) {
1336 if (t instanceof KeeperException.SessionExpiredException) {
1337 try {
1338 LOG.info("This client just lost it's session with ZooKeeper, trying" +
1339 " to reconnect.");
1340 resetZooKeeperTrackers();
1341 LOG.info("Reconnected successfully. This disconnect could have been" +
1342 " caused by a network partition or a long-running GC pause," +
1343 " either way it's recommended that you verify your environment.");
1344 return;
1345 } catch (ZooKeeperConnectionException e) {
1346 LOG.error("Could not reconnect to ZooKeeper after session" +
1347 " expiration, aborting");
1348 t = e;
1349 }
1350 }
1351 if (t != null) LOG.fatal(msg, t);
1352 else LOG.fatal(msg);
1353 this.closed = true;
1354 }
1355 }
1356 }