1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import java.io.DataInput;
23 import java.io.DataOutput;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.Iterator;
28 import java.util.LinkedList;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.TreeMap;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.SynchronousQueue;
34 import java.util.concurrent.ThreadFactory;
35 import java.util.concurrent.ThreadPoolExecutor;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hbase.DoNotRetryIOException;
43 import org.apache.hadoop.hbase.HBaseConfiguration;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.HRegionLocation;
47 import org.apache.hadoop.hbase.HServerAddress;
48 import org.apache.hadoop.hbase.HTableDescriptor;
49 import org.apache.hadoop.hbase.KeyValue;
50 import org.apache.hadoop.hbase.NotServingRegionException;
51 import org.apache.hadoop.hbase.UnknownScannerException;
52 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
53 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
54 import org.apache.hadoop.hbase.util.Bytes;
55 import org.apache.hadoop.hbase.util.Pair;
56 import org.apache.hadoop.hbase.util.Writables;
57 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
58 import org.apache.zookeeper.KeeperException;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88 public class HTable implements HTableInterface {
89 private static final Log LOG = LogFactory.getLog(HTable.class);
90 private final HConnection connection;
91 private final byte [] tableName;
92 protected final int scannerTimeout;
93 private volatile Configuration configuration;
94 private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
95 private long writeBufferSize;
96 private boolean clearBufferOnFail;
97 private boolean autoFlush;
98 private long currentWriteBufferSize;
99 protected int scannerCaching;
100 private int maxKeyValueSize;
101 private ExecutorService pool;
102 private long maxScannerResultSize;
103
104
105
106
107
108
109
110
111
112
113
114 public HTable(final String tableName)
115 throws IOException {
116 this(HBaseConfiguration.create(), Bytes.toBytes(tableName));
117 }
118
119
120
121
122
123
124
125
126
127
128
129
130 public HTable(final byte [] tableName)
131 throws IOException {
132 this(HBaseConfiguration.create(), tableName);
133 }
134
135
136
137
138
139
140
141
142
143
144
145 public HTable(Configuration conf, final String tableName)
146 throws IOException {
147 this(conf, Bytes.toBytes(tableName));
148 }
149
150
151
152
153
154
155
156
157
158
159
160
161 public HTable(Configuration conf, final byte [] tableName)
162 throws IOException {
163 this.tableName = tableName;
164 if (conf == null) {
165 this.scannerTimeout = 0;
166 this.connection = null;
167 return;
168 }
169 this.connection = HConnectionManager.getConnection(conf);
170 this.scannerTimeout =
171 (int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
172 this.configuration = conf;
173 this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
174 this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152);
175 this.clearBufferOnFail = true;
176 this.autoFlush = true;
177 this.currentWriteBufferSize = 0;
178 this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1);
179
180 this.maxScannerResultSize = conf.getLong(
181 HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
182 HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
183 this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
184
185 int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
186 if (maxThreads == 0) {
187 maxThreads = 1;
188 }
189
190
191
192
193
194 this.pool = new ThreadPoolExecutor(1, maxThreads,
195 60, TimeUnit.SECONDS,
196 new SynchronousQueue<Runnable>(),
197 new DaemonThreadFactory());
198 ((ThreadPoolExecutor)this.pool).allowCoreThreadTimeOut(true);
199 }
200
201
202
203
204
205 public int getCurrentNrHRS() throws IOException {
206 try {
207
208
209 return ZKUtil.getNumberOfChildren(this.connection.getZooKeeperWatcher(),
210 this.connection.getZooKeeperWatcher().rsZNode);
211 } catch (KeeperException ke) {
212 throw new IOException("Unexpected ZooKeeper exception", ke);
213 }
214 }
215
216 public Configuration getConfiguration() {
217 return configuration;
218 }
219
220
221
222
223
224
225
226
227 public static boolean isTableEnabled(String tableName) throws IOException {
228 return isTableEnabled(Bytes.toBytes(tableName));
229 }
230
231
232
233
234
235
236
237
238 public static boolean isTableEnabled(byte[] tableName) throws IOException {
239 return isTableEnabled(HBaseConfiguration.create(), tableName);
240 }
241
242
243
244
245
246
247
248
249
250 public static boolean isTableEnabled(Configuration conf, String tableName)
251 throws IOException {
252 return isTableEnabled(conf, Bytes.toBytes(tableName));
253 }
254
255
256
257
258
259
260
261
262 public static boolean isTableEnabled(Configuration conf, byte[] tableName)
263 throws IOException {
264 return HConnectionManager.getConnection(conf).isTableEnabled(tableName);
265 }
266
267
268
269
270
271
272
273 public HRegionLocation getRegionLocation(final String row)
274 throws IOException {
275 return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
276 }
277
278
279
280
281
282
283
284 public HRegionLocation getRegionLocation(final byte [] row)
285 throws IOException {
286 return connection.getRegionLocation(tableName, row, false);
287 }
288
289 @Override
290 public byte [] getTableName() {
291 return this.tableName;
292 }
293
294
295
296
297
298
299
300 public HConnection getConnection() {
301 return this.connection;
302 }
303
304
305
306
307
308
309 public int getScannerCaching() {
310 return scannerCaching;
311 }
312
313
314
315
316
317
318
319
320
321
322
323 public void setScannerCaching(int scannerCaching) {
324 this.scannerCaching = scannerCaching;
325 }
326
327 @Override
328 public HTableDescriptor getTableDescriptor() throws IOException {
329 return new UnmodifyableHTableDescriptor(
330 this.connection.getHTableDescriptor(this.tableName));
331 }
332
333
334
335
336
337
338
339
340 public byte [][] getStartKeys() throws IOException {
341 return getStartEndKeys().getFirst();
342 }
343
344
345
346
347
348
349
350
351 public byte[][] getEndKeys() throws IOException {
352 return getStartEndKeys().getSecond();
353 }
354
355
356
357
358
359
360
361
362
363 @SuppressWarnings("unchecked")
364 public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
365 final List<byte[]> startKeyList = new ArrayList<byte[]>();
366 final List<byte[]> endKeyList = new ArrayList<byte[]>();
367 MetaScannerVisitor visitor = new MetaScannerVisitor() {
368 public boolean processRow(Result rowResult) throws IOException {
369 byte [] bytes = rowResult.getValue(HConstants.CATALOG_FAMILY,
370 HConstants.REGIONINFO_QUALIFIER);
371 if (bytes == null) {
372 LOG.warn("Null " + HConstants.REGIONINFO_QUALIFIER + " cell in " +
373 rowResult);
374 return true;
375 }
376 HRegionInfo info = Writables.getHRegionInfo(bytes);
377 if (Bytes.equals(info.getTableDesc().getName(), getTableName())) {
378 if (!(info.isOffline() || info.isSplit())) {
379 startKeyList.add(info.getStartKey());
380 endKeyList.add(info.getEndKey());
381 }
382 }
383 return true;
384 }
385 };
386 MetaScanner.metaScan(configuration, visitor, this.tableName);
387 return new Pair(startKeyList.toArray(new byte[startKeyList.size()][]),
388 endKeyList.toArray(new byte[endKeyList.size()][]));
389 }
390
391
392
393
394
395
396
397
398 public Map<HRegionInfo, HServerAddress> getRegionsInfo() throws IOException {
399 final Map<HRegionInfo, HServerAddress> regionMap =
400 new TreeMap<HRegionInfo, HServerAddress>();
401
402 MetaScannerVisitor visitor = new MetaScannerVisitor() {
403 public boolean processRow(Result rowResult) throws IOException {
404 HRegionInfo info = Writables.getHRegionInfo(
405 rowResult.getValue(HConstants.CATALOG_FAMILY,
406 HConstants.REGIONINFO_QUALIFIER));
407
408 if (!(Bytes.equals(info.getTableDesc().getName(), getTableName()))) {
409 return false;
410 }
411
412 HServerAddress server = new HServerAddress();
413 byte [] value = rowResult.getValue(HConstants.CATALOG_FAMILY,
414 HConstants.SERVER_QUALIFIER);
415 if (value != null && value.length > 0) {
416 String address = Bytes.toString(value);
417 server = new HServerAddress(address);
418 }
419
420 if (!(info.isOffline() || info.isSplit())) {
421 regionMap.put(new UnmodifyableHRegionInfo(info), server);
422 }
423 return true;
424 }
425
426 };
427 MetaScanner.metaScan(configuration, visitor, tableName);
428 return regionMap;
429 }
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453 public void prewarmRegionCache(Map<HRegionInfo, HServerAddress> regionMap) {
454 this.connection.prewarmRegionCache(this.getTableName(), regionMap);
455 }
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477 public void serializeRegionInfo(DataOutput out) throws IOException {
478 Map<HRegionInfo, HServerAddress> allRegions = this.getRegionsInfo();
479
480 out.writeInt(allRegions.size());
481 for (Map.Entry<HRegionInfo, HServerAddress> es : allRegions.entrySet()) {
482 es.getKey().write(out);
483 es.getValue().write(out);
484 }
485 }
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503 public Map<HRegionInfo, HServerAddress> deserializeRegionInfo(DataInput in)
504 throws IOException {
505 final Map<HRegionInfo, HServerAddress> allRegions =
506 new TreeMap<HRegionInfo, HServerAddress>();
507
508
509 int regionsCount = in.readInt();
510 for (int i = 0; i < regionsCount; ++i) {
511 HRegionInfo hri = new HRegionInfo();
512 hri.readFields(in);
513 HServerAddress hsa = new HServerAddress();
514 hsa.readFields(in);
515 allRegions.put(hri, hsa);
516 }
517 return allRegions;
518 }
519
520 @Override
521 public Result getRowOrBefore(final byte[] row, final byte[] family)
522 throws IOException {
523 return connection.getRegionServerWithRetries(
524 new ServerCallable<Result>(connection, tableName, row) {
525 public Result call() throws IOException {
526 return server.getClosestRowBefore(location.getRegionInfo().getRegionName(),
527 row, family);
528 }
529 });
530 }
531
532 @Override
533 public ResultScanner getScanner(final Scan scan) throws IOException {
534 ClientScanner s = new ClientScanner(scan);
535 s.initialize();
536 return s;
537 }
538
539 @Override
540 public ResultScanner getScanner(byte [] family) throws IOException {
541 Scan scan = new Scan();
542 scan.addFamily(family);
543 return getScanner(scan);
544 }
545
546 @Override
547 public ResultScanner getScanner(byte [] family, byte [] qualifier)
548 throws IOException {
549 Scan scan = new Scan();
550 scan.addColumn(family, qualifier);
551 return getScanner(scan);
552 }
553
554 public Result get(final Get get) throws IOException {
555 return connection.getRegionServerWithRetries(
556 new ServerCallable<Result>(connection, tableName, get.getRow()) {
557 public Result call() throws IOException {
558 return server.get(location.getRegionInfo().getRegionName(), get);
559 }
560 }
561 );
562 }
563
564 public Result[] get(List<Get> gets) throws IOException {
565 try {
566 Object [] r1 = batch((List)gets);
567
568
569 Result [] results = new Result[r1.length];
570 int i=0;
571 for (Object o : r1) {
572
573 results[i++] = (Result) o;
574 }
575
576 return results;
577 } catch (InterruptedException e) {
578 throw new IOException(e);
579 }
580 }
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595 @Override
596 public synchronized void batch(final List<Row> actions, final Object[] results)
597 throws InterruptedException, IOException {
598 connection.processBatch(actions, tableName, pool, results);
599 }
600
601
602
603
604
605
606
607
608
609 @Override
610 public synchronized Object[] batch(final List<Row> actions) throws InterruptedException, IOException {
611 Object[] results = new Object[actions.size()];
612 connection.processBatch(actions, tableName, pool, results);
613 return results;
614 }
615
616
617
618
619
620
621
622
623 @Override
624 public void delete(final Delete delete)
625 throws IOException {
626 connection.getRegionServerWithRetries(
627 new ServerCallable<Boolean>(connection, tableName, delete.getRow()) {
628 public Boolean call() throws IOException {
629 server.delete(location.getRegionInfo().getRegionName(), delete);
630 return null;
631 }
632 }
633 );
634 }
635
636
637
638
639
640
641
642
643
644
645
646 @Override
647 public void delete(final List<Delete> deletes)
648 throws IOException {
649 Object[] results = new Object[deletes.size()];
650 try {
651 connection.processBatch((List) deletes, tableName, pool, results);
652 } catch (InterruptedException e) {
653 throw new IOException(e);
654 } finally {
655
656
657
658 for (int i = results.length - 1; i>=0; i--) {
659
660 if (results[i] instanceof Result) {
661 deletes.remove(i);
662 }
663 }
664 }
665 }
666
667 @Override
668 public void put(final Put put) throws IOException {
669 doPut(Arrays.asList(put));
670 }
671
672 @Override
673 public void put(final List<Put> puts) throws IOException {
674 doPut(puts);
675 }
676
677 private void doPut(final List<Put> puts) throws IOException {
678 for (Put put : puts) {
679 validatePut(put);
680 writeBuffer.add(put);
681 currentWriteBufferSize += put.heapSize();
682 }
683 if (autoFlush || currentWriteBufferSize > writeBufferSize) {
684 flushCommits();
685 }
686 }
687
688 @Override
689 public Result increment(final Increment increment) throws IOException {
690 if (!increment.hasFamilies()) {
691 throw new IOException(
692 "Invalid arguments to increment, no columns specified");
693 }
694 return connection.getRegionServerWithRetries(
695 new ServerCallable<Result>(connection, tableName, increment.getRow()) {
696 public Result call() throws IOException {
697 return server.increment(
698 location.getRegionInfo().getRegionName(), increment);
699 }
700 }
701 );
702 }
703
704 @Override
705 public long incrementColumnValue(final byte [] row, final byte [] family,
706 final byte [] qualifier, final long amount)
707 throws IOException {
708 return incrementColumnValue(row, family, qualifier, amount, true);
709 }
710
711 @Override
712 public long incrementColumnValue(final byte [] row, final byte [] family,
713 final byte [] qualifier, final long amount, final boolean writeToWAL)
714 throws IOException {
715 NullPointerException npe = null;
716 if (row == null) {
717 npe = new NullPointerException("row is null");
718 } else if (family == null) {
719 npe = new NullPointerException("column is null");
720 }
721 if (npe != null) {
722 throw new IOException(
723 "Invalid arguments to incrementColumnValue", npe);
724 }
725 return connection.getRegionServerWithRetries(
726 new ServerCallable<Long>(connection, tableName, row) {
727 public Long call() throws IOException {
728 return server.incrementColumnValue(
729 location.getRegionInfo().getRegionName(), row, family,
730 qualifier, amount, writeToWAL);
731 }
732 }
733 );
734 }
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749 @Override
750 public boolean checkAndPut(final byte [] row,
751 final byte [] family, final byte [] qualifier, final byte [] value,
752 final Put put)
753 throws IOException {
754 return connection.getRegionServerWithRetries(
755 new ServerCallable<Boolean>(connection, tableName, row) {
756 public Boolean call() throws IOException {
757 return server.checkAndPut(location.getRegionInfo().getRegionName(),
758 row, family, qualifier, value, put) ? Boolean.TRUE : Boolean.FALSE;
759 }
760 }
761 );
762 }
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777 @Override
778 public boolean checkAndDelete(final byte [] row,
779 final byte [] family, final byte [] qualifier, final byte [] value,
780 final Delete delete)
781 throws IOException {
782 return connection.getRegionServerWithRetries(
783 new ServerCallable<Boolean>(connection, tableName, row) {
784 public Boolean call() throws IOException {
785 return server.checkAndDelete(
786 location.getRegionInfo().getRegionName(),
787 row, family, qualifier, value, delete)
788 ? Boolean.TRUE : Boolean.FALSE;
789 }
790 }
791 );
792 }
793
794
795
796
797
798
799
800
801
802
803
804
805 @Override
806 public boolean exists(final Get get) throws IOException {
807 return connection.getRegionServerWithRetries(
808 new ServerCallable<Boolean>(connection, tableName, get.getRow()) {
809 public Boolean call() throws IOException {
810 return server.
811 exists(location.getRegionInfo().getRegionName(), get);
812 }
813 }
814 );
815 }
816
817
818
819
820
821
822
823
824
825 @Override
826 public void flushCommits() throws IOException {
827 try {
828 connection.processBatchOfPuts(writeBuffer, tableName, pool);
829 } finally {
830 if (clearBufferOnFail) {
831 writeBuffer.clear();
832 currentWriteBufferSize = 0;
833 } else {
834
835 currentWriteBufferSize = 0;
836 for (Put aPut : writeBuffer) {
837 currentWriteBufferSize += aPut.heapSize();
838 }
839 }
840 }
841 }
842
843 @Override
844 public void close() throws IOException {
845 flushCommits();
846 this.pool.shutdown();
847 }
848
849
850 private void validatePut(final Put put) throws IllegalArgumentException{
851 if (put.isEmpty()) {
852 throw new IllegalArgumentException("No columns to insert");
853 }
854 if (maxKeyValueSize > 0) {
855 for (List<KeyValue> list : put.getFamilyMap().values()) {
856 for (KeyValue kv : list) {
857 if (kv.getLength() > maxKeyValueSize) {
858 throw new IllegalArgumentException("KeyValue size too large");
859 }
860 }
861 }
862 }
863 }
864
865 @Override
866 public RowLock lockRow(final byte [] row)
867 throws IOException {
868 return connection.getRegionServerWithRetries(
869 new ServerCallable<RowLock>(connection, tableName, row) {
870 public RowLock call() throws IOException {
871 long lockId =
872 server.lockRow(location.getRegionInfo().getRegionName(), row);
873 return new RowLock(row,lockId);
874 }
875 }
876 );
877 }
878
879 @Override
880 public void unlockRow(final RowLock rl)
881 throws IOException {
882 connection.getRegionServerWithRetries(
883 new ServerCallable<Boolean>(connection, tableName, rl.getRow()) {
884 public Boolean call() throws IOException {
885 server.unlockRow(location.getRegionInfo().getRegionName(),
886 rl.getLockId());
887 return null;
888 }
889 }
890 );
891 }
892
893
894
895
896
897 public void clearRegionCache() {
898 this.connection.clearRegionCache();
899 }
900
901 @Override
902 public boolean isAutoFlush() {
903 return autoFlush;
904 }
905
906
907
908
909
910
911
912 public void setAutoFlush(boolean autoFlush) {
913 setAutoFlush(autoFlush, autoFlush);
914 }
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943 public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
944 this.autoFlush = autoFlush;
945 this.clearBufferOnFail = autoFlush || clearBufferOnFail;
946 }
947
948
949
950
951
952
953
954
955 public long getWriteBufferSize() {
956 return writeBufferSize;
957 }
958
959
960
961
962
963
964
965
966
967 public void setWriteBufferSize(long writeBufferSize) throws IOException {
968 this.writeBufferSize = writeBufferSize;
969 if(currentWriteBufferSize > writeBufferSize) {
970 flushCommits();
971 }
972 }
973
974
975
976
977
978 public ArrayList<Put> getWriteBuffer() {
979 return writeBuffer;
980 }
981
982
983
984
985
986
987 protected class ClientScanner implements ResultScanner {
988 private final Log CLIENT_LOG = LogFactory.getLog(this.getClass());
989
990 private Scan scan;
991 private boolean closed = false;
992
993
994 private HRegionInfo currentRegion = null;
995 private ScannerCallable callable = null;
996 private final LinkedList<Result> cache = new LinkedList<Result>();
997 private final int caching;
998 private long lastNext;
999
1000 private Result lastResult = null;
1001
1002 protected ClientScanner(final Scan scan) {
1003 if (CLIENT_LOG.isDebugEnabled()) {
1004 CLIENT_LOG.debug("Creating scanner over "
1005 + Bytes.toString(getTableName())
1006 + " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'");
1007 }
1008 this.scan = scan;
1009 this.lastNext = System.currentTimeMillis();
1010
1011
1012 if (this.scan.getCaching() > 0) {
1013 this.caching = this.scan.getCaching();
1014 } else {
1015 this.caching = HTable.this.scannerCaching;
1016 }
1017
1018
1019
1020
1021
1022
1023
1024 }
1025
1026 public void initialize() throws IOException {
1027 nextScanner(this.caching, false);
1028 }
1029
1030 protected Scan getScan() {
1031 return scan;
1032 }
1033
1034 protected long getTimestamp() {
1035 return lastNext;
1036 }
1037
1038
1039 private boolean checkScanStopRow(final byte [] endKey) {
1040 if (this.scan.getStopRow().length > 0) {
1041
1042 byte [] stopRow = scan.getStopRow();
1043 int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
1044 endKey, 0, endKey.length);
1045 if (cmp <= 0) {
1046
1047
1048 return true;
1049 }
1050 }
1051 return false;
1052 }
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063 private boolean nextScanner(int nbRows, final boolean done)
1064 throws IOException {
1065
1066 if (this.callable != null) {
1067 this.callable.setClose();
1068 getConnection().getRegionServerWithRetries(callable);
1069 this.callable = null;
1070 }
1071
1072
1073 byte [] localStartKey;
1074
1075
1076 if (this.currentRegion != null) {
1077 byte [] endKey = this.currentRegion.getEndKey();
1078 if (endKey == null ||
1079 Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
1080 checkScanStopRow(endKey) ||
1081 done) {
1082 close();
1083 if (CLIENT_LOG.isDebugEnabled()) {
1084 CLIENT_LOG.debug("Finished with scanning at " + this.currentRegion);
1085 }
1086 return false;
1087 }
1088 localStartKey = endKey;
1089 if (CLIENT_LOG.isDebugEnabled()) {
1090 CLIENT_LOG.debug("Finished with region " + this.currentRegion);
1091 }
1092 } else {
1093 localStartKey = this.scan.getStartRow();
1094 }
1095
1096 if (CLIENT_LOG.isDebugEnabled()) {
1097 CLIENT_LOG.debug("Advancing internal scanner to startKey at '" +
1098 Bytes.toStringBinary(localStartKey) + "'");
1099 }
1100 try {
1101 callable = getScannerCallable(localStartKey, nbRows);
1102
1103
1104 getConnection().getRegionServerWithRetries(callable);
1105 this.currentRegion = callable.getHRegionInfo();
1106 } catch (IOException e) {
1107 close();
1108 throw e;
1109 }
1110 return true;
1111 }
1112
1113 protected ScannerCallable getScannerCallable(byte [] localStartKey,
1114 int nbRows) {
1115 scan.setStartRow(localStartKey);
1116 ScannerCallable s = new ScannerCallable(getConnection(),
1117 getTableName(), scan);
1118 s.setCaching(nbRows);
1119 return s;
1120 }
1121
1122 public Result next() throws IOException {
1123
1124
1125 if (cache.size() == 0 && this.closed) {
1126 return null;
1127 }
1128 if (cache.size() == 0) {
1129 Result [] values = null;
1130 long remainingResultSize = maxScannerResultSize;
1131 int countdown = this.caching;
1132
1133
1134 callable.setCaching(this.caching);
1135
1136
1137 boolean skipFirst = false;
1138 do {
1139 try {
1140 if (skipFirst) {
1141
1142
1143 callable.setCaching(1);
1144 values = getConnection().getRegionServerWithRetries(callable);
1145 callable.setCaching(this.caching);
1146 skipFirst = false;
1147 }
1148
1149
1150
1151 values = getConnection().getRegionServerWithRetries(callable);
1152 } catch (DoNotRetryIOException e) {
1153 if (e instanceof UnknownScannerException) {
1154 long timeout = lastNext + scannerTimeout;
1155
1156
1157
1158 if (timeout < System.currentTimeMillis()) {
1159 long elapsed = System.currentTimeMillis() - lastNext;
1160 ScannerTimeoutException ex = new ScannerTimeoutException(
1161 elapsed + "ms passed since the last invocation, " +
1162 "timeout is currently set to " + scannerTimeout);
1163 ex.initCause(e);
1164 throw ex;
1165 }
1166 } else {
1167 Throwable cause = e.getCause();
1168 if (cause == null || !(cause instanceof NotServingRegionException)) {
1169 throw e;
1170 }
1171 }
1172
1173
1174 if (this.lastResult != null) {
1175 this.scan.setStartRow(this.lastResult.getRow());
1176
1177
1178 skipFirst = true;
1179 }
1180
1181 this.currentRegion = null;
1182 continue;
1183 }
1184 lastNext = System.currentTimeMillis();
1185 if (values != null && values.length > 0) {
1186 for (Result rs : values) {
1187 cache.add(rs);
1188 for (KeyValue kv : rs.raw()) {
1189 remainingResultSize -= kv.heapSize();
1190 }
1191 countdown--;
1192 this.lastResult = rs;
1193 }
1194 }
1195
1196 } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
1197 }
1198
1199 if (cache.size() > 0) {
1200 return cache.poll();
1201 }
1202 return null;
1203 }
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214 public Result [] next(int nbRows) throws IOException {
1215
1216 ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
1217 for(int i = 0; i < nbRows; i++) {
1218 Result next = next();
1219 if (next != null) {
1220 resultSets.add(next);
1221 } else {
1222 break;
1223 }
1224 }
1225 return resultSets.toArray(new Result[resultSets.size()]);
1226 }
1227
1228 public void close() {
1229 if (callable != null) {
1230 callable.setClose();
1231 try {
1232 getConnection().getRegionServerWithRetries(callable);
1233 } catch (IOException e) {
1234
1235
1236
1237
1238 }
1239 callable = null;
1240 }
1241 closed = true;
1242 }
1243
1244 public Iterator<Result> iterator() {
1245 return new Iterator<Result>() {
1246
1247 Result next = null;
1248
1249
1250
1251
1252
1253 public boolean hasNext() {
1254 if (next == null) {
1255 try {
1256 next = ClientScanner.this.next();
1257 return next != null;
1258 } catch (IOException e) {
1259 throw new RuntimeException(e);
1260 }
1261 }
1262 return true;
1263 }
1264
1265
1266
1267 public Result next() {
1268
1269
1270 if (!hasNext()) {
1271 return null;
1272 }
1273
1274
1275
1276
1277 Result temp = next;
1278 next = null;
1279 return temp;
1280 }
1281
1282 public void remove() {
1283 throw new UnsupportedOperationException();
1284 }
1285 };
1286 }
1287 }
1288
1289
1290
1291
1292
1293 ExecutorService getPool() {
1294 return this.pool;
1295 }
1296
1297 static class DaemonThreadFactory implements ThreadFactory {
1298 static final AtomicInteger poolNumber = new AtomicInteger(1);
1299 final ThreadGroup group;
1300 final AtomicInteger threadNumber = new AtomicInteger(1);
1301 final String namePrefix;
1302
1303 DaemonThreadFactory() {
1304 SecurityManager s = System.getSecurityManager();
1305 group = (s != null)? s.getThreadGroup() :
1306 Thread.currentThread().getThreadGroup();
1307 namePrefix = "pool-" +
1308 poolNumber.getAndIncrement() +
1309 "-thread-";
1310 }
1311
1312 public Thread newThread(Runnable r) {
1313 Thread t = new Thread(group, r,
1314 namePrefix + threadNumber.getAndIncrement(),
1315 0);
1316 if (!t.isDaemon()) {
1317 t.setDaemon(true);
1318 }
1319 if (t.getPriority() != Thread.NORM_PRIORITY) {
1320 t.setPriority(Thread.NORM_PRIORITY);
1321 }
1322 return t;
1323 }
1324 }
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335 public static void setRegionCachePrefetch(final byte[] tableName,
1336 boolean enable) throws ZooKeeperConnectionException {
1337 HConnectionManager.getConnection(HBaseConfiguration.create()).
1338 setRegionCachePrefetch(tableName, enable);
1339 }
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351 public static void setRegionCachePrefetch(final Configuration conf,
1352 final byte[] tableName, boolean enable) throws ZooKeeperConnectionException {
1353 HConnectionManager.getConnection(conf).setRegionCachePrefetch(
1354 tableName, enable);
1355 }
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365 public static boolean getRegionCachePrefetch(final Configuration conf,
1366 final byte[] tableName) throws ZooKeeperConnectionException {
1367 return HConnectionManager.getConnection(conf).getRegionCachePrefetch(
1368 tableName);
1369 }
1370
1371
1372
1373
1374
1375
1376
1377
1378 public static boolean getRegionCachePrefetch(final byte[] tableName) throws ZooKeeperConnectionException {
1379 return HConnectionManager.getConnection(HBaseConfiguration.create()).
1380 getRegionCachePrefetch(tableName);
1381 }
1382 }