1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.thrift;
20
21 import java.io.IOException;
22 import java.net.InetAddress;
23 import java.net.InetSocketAddress;
24 import java.net.UnknownHostException;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.TreeMap;
31
32 import org.apache.commons.cli.CommandLine;
33 import org.apache.commons.cli.CommandLineParser;
34 import org.apache.commons.cli.HelpFormatter;
35 import org.apache.commons.cli.Option;
36 import org.apache.commons.cli.OptionGroup;
37 import org.apache.commons.cli.Options;
38 import org.apache.commons.cli.PosixParser;
39 import org.apache.commons.logging.Log;
40 import org.apache.commons.logging.LogFactory;
41 import org.apache.hadoop.conf.Configuration;
42 import org.apache.hadoop.hbase.HBaseConfiguration;
43 import org.apache.hadoop.hbase.HColumnDescriptor;
44 import org.apache.hadoop.hbase.HConstants;
45 import org.apache.hadoop.hbase.HRegionInfo;
46 import org.apache.hadoop.hbase.HServerAddress;
47 import org.apache.hadoop.hbase.HTableDescriptor;
48 import org.apache.hadoop.hbase.KeyValue;
49 import org.apache.hadoop.hbase.client.Delete;
50 import org.apache.hadoop.hbase.client.Get;
51 import org.apache.hadoop.hbase.client.HBaseAdmin;
52 import org.apache.hadoop.hbase.client.HTable;
53 import org.apache.hadoop.hbase.client.Put;
54 import org.apache.hadoop.hbase.client.Result;
55 import org.apache.hadoop.hbase.client.ResultScanner;
56 import org.apache.hadoop.hbase.client.Scan;
57 import org.apache.hadoop.hbase.filter.Filter;
58 import org.apache.hadoop.hbase.filter.PrefixFilter;
59 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
60 import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
61 import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
62 import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
63 import org.apache.hadoop.hbase.thrift.generated.Hbase;
64 import org.apache.hadoop.hbase.thrift.generated.IOError;
65 import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
66 import org.apache.hadoop.hbase.thrift.generated.Mutation;
67 import org.apache.hadoop.hbase.thrift.generated.TCell;
68 import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
69 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
70 import org.apache.hadoop.hbase.util.Bytes;
71 import org.apache.thrift.TException;
72 import org.apache.thrift.protocol.TBinaryProtocol;
73 import org.apache.thrift.protocol.TCompactProtocol;
74 import org.apache.thrift.protocol.TProtocolFactory;
75 import org.apache.thrift.server.THsHaServer;
76 import org.apache.thrift.server.TNonblockingServer;
77 import org.apache.thrift.server.TServer;
78 import org.apache.thrift.server.TThreadPoolServer;
79 import org.apache.thrift.transport.TFramedTransport;
80 import org.apache.thrift.transport.TNonblockingServerSocket;
81 import org.apache.thrift.transport.TNonblockingServerTransport;
82 import org.apache.thrift.transport.TServerSocket;
83 import org.apache.thrift.transport.TServerTransport;
84 import org.apache.thrift.transport.TTransportFactory;
85
86
87
88
89
90 public class ThriftServer {
91
92
93
94
95
96 public static class HBaseHandler implements Hbase.Iface {
97 protected Configuration conf;
98 protected HBaseAdmin admin = null;
99 protected final Log LOG = LogFactory.getLog(this.getClass().getName());
100
101
102 protected int nextScannerId = 0;
103 protected HashMap<Integer, ResultScanner> scannerMap = null;
104
105 private static ThreadLocal<Map<String, HTable>> threadLocalTables = new ThreadLocal<Map<String, HTable>>() {
106 @Override
107 protected Map<String, HTable> initialValue() {
108 return new TreeMap<String, HTable>();
109 }
110 };
111
112
113
114
115
116
117
118
119 byte[][] getAllColumns(HTable table) throws IOException {
120 HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
121 byte[][] columns = new byte[cds.length][];
122 for (int i = 0; i < cds.length; i++) {
123 columns[i] = Bytes.add(cds[i].getName(),
124 KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
125 }
126 return columns;
127 }
128
129
130
131
132
133
134
135
136
137
138 protected HTable getTable(final byte[] tableName) throws IOError,
139 IOException {
140 String table = new String(tableName);
141 Map<String, HTable> tables = threadLocalTables.get();
142 if (!tables.containsKey(table)) {
143 tables.put(table, new HTable(conf, tableName));
144 }
145 return tables.get(table);
146 }
147
148
149
150
151
152
153
154
155 protected synchronized int addScanner(ResultScanner scanner) {
156 int id = nextScannerId++;
157 scannerMap.put(id, scanner);
158 return id;
159 }
160
161
162
163
164
165
166
167 protected synchronized ResultScanner getScanner(int id) {
168 return scannerMap.get(id);
169 }
170
171
172
173
174
175
176
177
178 protected synchronized ResultScanner removeScanner(int id) {
179 return scannerMap.remove(id);
180 }
181
182
183
184
185
186 HBaseHandler()
187 throws IOException {
188 this(HBaseConfiguration.create());
189 }
190
191 HBaseHandler(final Configuration c)
192 throws IOException {
193 this.conf = c;
194 admin = new HBaseAdmin(conf);
195 scannerMap = new HashMap<Integer, ResultScanner>();
196 }
197
198 public void enableTable(final byte[] tableName) throws IOError {
199 try{
200 admin.enableTable(tableName);
201 } catch (IOException e) {
202 throw new IOError(e.getMessage());
203 }
204 }
205
206 public void disableTable(final byte[] tableName) throws IOError{
207 try{
208 admin.disableTable(tableName);
209 } catch (IOException e) {
210 throw new IOError(e.getMessage());
211 }
212 }
213
214 public boolean isTableEnabled(final byte[] tableName) throws IOError {
215 try {
216 return HTable.isTableEnabled(this.conf, tableName);
217 } catch (IOException e) {
218 throw new IOError(e.getMessage());
219 }
220 }
221
222 public void compact(byte[] tableNameOrRegionName) throws IOError {
223 try{
224 admin.compact(tableNameOrRegionName);
225 } catch (InterruptedException e) {
226 throw new IOError(e.getMessage());
227 } catch (IOException e) {
228 throw new IOError(e.getMessage());
229 }
230 }
231
232 public void majorCompact(byte[] tableNameOrRegionName) throws IOError {
233 try{
234 admin.majorCompact(tableNameOrRegionName);
235 } catch (InterruptedException e) {
236 throw new IOError(e.getMessage());
237 } catch (IOException e) {
238 throw new IOError(e.getMessage());
239 }
240 }
241
242 public List<byte[]> getTableNames() throws IOError {
243 try {
244 HTableDescriptor[] tables = this.admin.listTables();
245 ArrayList<byte[]> list = new ArrayList<byte[]>(tables.length);
246 for (int i = 0; i < tables.length; i++) {
247 list.add(tables[i].getName());
248 }
249 return list;
250 } catch (IOException e) {
251 throw new IOError(e.getMessage());
252 }
253 }
254
255 public List<TRegionInfo> getTableRegions(byte[] tableName)
256 throws IOError {
257 try{
258 HTable table = getTable(tableName);
259 Map<HRegionInfo, HServerAddress> regionsInfo = table.getRegionsInfo();
260 List<TRegionInfo> regions = new ArrayList<TRegionInfo>();
261
262 for (HRegionInfo regionInfo : regionsInfo.keySet()){
263 TRegionInfo region = new TRegionInfo();
264 region.startKey = regionInfo.getStartKey();
265 region.endKey = regionInfo.getEndKey();
266 region.id = regionInfo.getRegionId();
267 region.name = regionInfo.getRegionName();
268 region.version = regionInfo.getVersion();
269 regions.add(region);
270 }
271 return regions;
272 } catch (IOException e){
273 throw new IOError(e.getMessage());
274 }
275 }
276
277 @Deprecated
278 public List<TCell> get(byte[] tableName, byte[] row, byte[] column)
279 throws IOError {
280 byte [][] famAndQf = KeyValue.parseColumn(column);
281 if(famAndQf.length == 1) {
282 return get(tableName, row, famAndQf[0], new byte[0]);
283 }
284 return get(tableName, row, famAndQf[0], famAndQf[1]);
285 }
286
287 public List<TCell> get(byte [] tableName, byte [] row, byte [] family,
288 byte [] qualifier) throws IOError {
289 try {
290 HTable table = getTable(tableName);
291 Get get = new Get(row);
292 if (qualifier == null || qualifier.length == 0) {
293 get.addFamily(family);
294 } else {
295 get.addColumn(family, qualifier);
296 }
297 Result result = table.get(get);
298 return ThriftUtilities.cellFromHBase(result.sorted());
299 } catch (IOException e) {
300 throw new IOError(e.getMessage());
301 }
302 }
303
304 @Deprecated
305 public List<TCell> getVer(byte[] tableName, byte[] row,
306 byte[] column, int numVersions) throws IOError {
307 byte [][] famAndQf = KeyValue.parseColumn(column);
308 if(famAndQf.length == 1) {
309 return getVer(tableName, row, famAndQf[0], new byte[0], numVersions);
310 }
311 return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions);
312 }
313
314 public List<TCell> getVer(byte [] tableName, byte [] row, byte [] family,
315 byte [] qualifier, int numVersions) throws IOError {
316 try {
317 HTable table = getTable(tableName);
318 Get get = new Get(row);
319 get.addColumn(family, qualifier);
320 get.setMaxVersions(numVersions);
321 Result result = table.get(get);
322 return ThriftUtilities.cellFromHBase(result.sorted());
323 } catch (IOException e) {
324 throw new IOError(e.getMessage());
325 }
326 }
327
328 @Deprecated
329 public List<TCell> getVerTs(byte[] tableName, byte[] row,
330 byte[] column, long timestamp, int numVersions) throws IOError {
331 byte [][] famAndQf = KeyValue.parseColumn(column);
332 if(famAndQf.length == 1) {
333 return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp,
334 numVersions);
335 }
336 return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp,
337 numVersions);
338 }
339
340 public List<TCell> getVerTs(byte [] tableName, byte [] row, byte [] family,
341 byte [] qualifier, long timestamp, int numVersions) throws IOError {
342 try {
343 HTable table = getTable(tableName);
344 Get get = new Get(row);
345 get.addColumn(family, qualifier);
346 get.setTimeRange(Long.MIN_VALUE, timestamp);
347 get.setMaxVersions(numVersions);
348 Result result = table.get(get);
349 return ThriftUtilities.cellFromHBase(result.sorted());
350 } catch (IOException e) {
351 throw new IOError(e.getMessage());
352 }
353 }
354
355 public List<TRowResult> getRow(byte[] tableName, byte[] row)
356 throws IOError {
357 return getRowWithColumnsTs(tableName, row, null,
358 HConstants.LATEST_TIMESTAMP);
359 }
360
361 public List<TRowResult> getRowWithColumns(byte[] tableName, byte[] row,
362 List<byte[]> columns) throws IOError {
363 return getRowWithColumnsTs(tableName, row, columns,
364 HConstants.LATEST_TIMESTAMP);
365 }
366
367 public List<TRowResult> getRowTs(byte[] tableName, byte[] row,
368 long timestamp) throws IOError {
369 return getRowWithColumnsTs(tableName, row, null,
370 timestamp);
371 }
372
373 public List<TRowResult> getRowWithColumnsTs(byte[] tableName, byte[] row,
374 List<byte[]> columns, long timestamp) throws IOError {
375 try {
376 HTable table = getTable(tableName);
377 if (columns == null) {
378 Get get = new Get(row);
379 get.setTimeRange(Long.MIN_VALUE, timestamp);
380 Result result = table.get(get);
381 return ThriftUtilities.rowResultFromHBase(result);
382 }
383 byte[][] columnArr = columns.toArray(new byte[columns.size()][]);
384 Get get = new Get(row);
385 for(byte [] column : columnArr) {
386 byte [][] famAndQf = KeyValue.parseColumn(column);
387 if (famAndQf.length == 1) {
388 get.addFamily(famAndQf[0]);
389 } else {
390 get.addColumn(famAndQf[0], famAndQf[1]);
391 }
392 }
393 get.setTimeRange(Long.MIN_VALUE, timestamp);
394 Result result = table.get(get);
395 return ThriftUtilities.rowResultFromHBase(result);
396 } catch (IOException e) {
397 throw new IOError(e.getMessage());
398 }
399 }
400
401 public List<TRowResult> getRows(byte[] tableName, List<byte[]> rows)
402 throws IOError {
403 return getRowsWithColumnsTs(tableName, rows, null,
404 HConstants.LATEST_TIMESTAMP);
405 }
406
407 public List<TRowResult> getRowsWithColumns(byte[] tableName, List<byte[]> rows,
408 List<byte[]> columns) throws IOError {
409 return getRowsWithColumnsTs(tableName, rows, columns,
410 HConstants.LATEST_TIMESTAMP);
411 }
412
413 public List<TRowResult> getRowsTs(byte[] tableName, List<byte[]> rows,
414 long timestamp) throws IOError {
415 return getRowsWithColumnsTs(tableName, rows, null,
416 timestamp);
417 }
418
419 public List<TRowResult> getRowsWithColumnsTs(byte[] tableName, List<byte[]> rows,
420 List<byte[]> columns, long timestamp) throws IOError {
421 try {
422 List<Get> gets = new ArrayList<Get>(rows.size());
423 HTable table = getTable(tableName);
424 for (byte[] row : rows) {
425 Get get = new Get(row);
426 if (columns != null) {
427 byte[][] columnArr = columns.toArray(new byte[columns.size()][]);
428 for(byte [] column : columnArr) {
429 byte [][] famAndQf = KeyValue.parseColumn(column);
430 if (famAndQf.length == 1) {
431 get.addFamily(famAndQf[0]);
432 } else {
433 get.addColumn(famAndQf[0], famAndQf[1]);
434 }
435 }
436 get.setTimeRange(Long.MIN_VALUE, timestamp);
437 }
438 gets.add(get);
439 }
440 Result[] result = table.get(gets);
441 return ThriftUtilities.rowResultFromHBase(result);
442 } catch (IOException e) {
443 throw new IOError(e.getMessage());
444 }
445 }
446
447 public void deleteAll(byte[] tableName, byte[] row, byte[] column)
448 throws IOError {
449 deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP);
450 }
451
452 public void deleteAllTs(byte[] tableName, byte[] row, byte[] column,
453 long timestamp) throws IOError {
454 try {
455 HTable table = getTable(tableName);
456 Delete delete = new Delete(row);
457 byte [][] famAndQf = KeyValue.parseColumn(column);
458 if (famAndQf.length == 1) {
459 delete.deleteFamily(famAndQf[0], timestamp);
460 } else {
461 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
462 }
463 table.delete(delete);
464
465 } catch (IOException e) {
466 throw new IOError(e.getMessage());
467 }
468 }
469
470 public void deleteAllRow(byte[] tableName, byte[] row) throws IOError {
471 deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP);
472 }
473
474 public void deleteAllRowTs(byte[] tableName, byte[] row, long timestamp)
475 throws IOError {
476 try {
477 HTable table = getTable(tableName);
478 Delete delete = new Delete(row, timestamp, null);
479 table.delete(delete);
480 } catch (IOException e) {
481 throw new IOError(e.getMessage());
482 }
483 }
484
485 public void createTable(byte[] tableName,
486 List<ColumnDescriptor> columnFamilies) throws IOError,
487 IllegalArgument, AlreadyExists {
488 try {
489 if (admin.tableExists(tableName)) {
490 throw new AlreadyExists("table name already in use");
491 }
492 HTableDescriptor desc = new HTableDescriptor(tableName);
493 for (ColumnDescriptor col : columnFamilies) {
494 HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
495 desc.addFamily(colDesc);
496 }
497 admin.createTable(desc);
498 } catch (IOException e) {
499 throw new IOError(e.getMessage());
500 } catch (IllegalArgumentException e) {
501 throw new IllegalArgument(e.getMessage());
502 }
503 }
504
505 public void deleteTable(byte[] tableName) throws IOError {
506 if (LOG.isDebugEnabled()) {
507 LOG.debug("deleteTable: table=" + new String(tableName));
508 }
509 try {
510 if (!admin.tableExists(tableName)) {
511 throw new IOError("table does not exist");
512 }
513 admin.deleteTable(tableName);
514 } catch (IOException e) {
515 throw new IOError(e.getMessage());
516 }
517 }
518
519 public void mutateRow(byte[] tableName, byte[] row,
520 List<Mutation> mutations) throws IOError, IllegalArgument {
521 mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP);
522 }
523
524 public void mutateRowTs(byte[] tableName, byte[] row,
525 List<Mutation> mutations, long timestamp) throws IOError, IllegalArgument {
526 HTable table = null;
527 try {
528 table = getTable(tableName);
529 Put put = new Put(row, timestamp, null);
530
531 Delete delete = new Delete(row);
532
533
534 for (Mutation m : mutations) {
535 byte[][] famAndQf = KeyValue.parseColumn(m.column);
536 if (m.isDelete) {
537 if (famAndQf.length == 1) {
538 delete.deleteFamily(famAndQf[0], timestamp);
539 } else {
540 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
541 }
542 } else {
543 if(famAndQf.length == 1) {
544 put.add(famAndQf[0], new byte[0], m.value);
545 } else {
546 put.add(famAndQf[0], famAndQf[1], m.value);
547 }
548 }
549 }
550 if (!delete.isEmpty())
551 table.delete(delete);
552 if (!put.isEmpty())
553 table.put(put);
554 } catch (IOException e) {
555 throw new IOError(e.getMessage());
556 } catch (IllegalArgumentException e) {
557 throw new IllegalArgument(e.getMessage());
558 }
559 }
560
561 public void mutateRows(byte[] tableName, List<BatchMutation> rowBatches)
562 throws IOError, IllegalArgument, TException {
563 mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP);
564 }
565
566 public void mutateRowsTs(byte[] tableName, List<BatchMutation> rowBatches, long timestamp)
567 throws IOError, IllegalArgument, TException {
568 List<Put> puts = new ArrayList<Put>();
569 List<Delete> deletes = new ArrayList<Delete>();
570
571 for (BatchMutation batch : rowBatches) {
572 byte[] row = batch.row;
573 List<Mutation> mutations = batch.mutations;
574 Delete delete = new Delete(row);
575 Put put = new Put(row, timestamp, null);
576 for (Mutation m : mutations) {
577 byte[][] famAndQf = KeyValue.parseColumn(m.column);
578 if (m.isDelete) {
579
580 if (famAndQf.length == 1) {
581 delete.deleteFamily(famAndQf[0], timestamp);
582 } else {
583 delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
584 }
585 } else {
586 if(famAndQf.length == 1) {
587 put.add(famAndQf[0], new byte[0], m.value);
588 } else {
589 put.add(famAndQf[0], famAndQf[1], m.value);
590 }
591 }
592 }
593 if (!delete.isEmpty())
594 deletes.add(delete);
595 if (!put.isEmpty())
596 puts.add(put);
597 }
598
599 HTable table = null;
600 try {
601 table = getTable(tableName);
602 if (!puts.isEmpty())
603 table.put(puts);
604 for (Delete del : deletes) {
605 table.delete(del);
606 }
607 } catch (IOException e) {
608 throw new IOError(e.getMessage());
609 } catch (IllegalArgumentException e) {
610 throw new IllegalArgument(e.getMessage());
611 }
612 }
613
614 @Deprecated
615 public long atomicIncrement(byte[] tableName, byte[] row, byte[] column,
616 long amount) throws IOError, IllegalArgument, TException {
617 byte [][] famAndQf = KeyValue.parseColumn(column);
618 if(famAndQf.length == 1) {
619 return atomicIncrement(tableName, row, famAndQf[0], new byte[0],
620 amount);
621 }
622 return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
623 }
624
625 public long atomicIncrement(byte [] tableName, byte [] row, byte [] family,
626 byte [] qualifier, long amount)
627 throws IOError, IllegalArgument, TException {
628 HTable table;
629 try {
630 table = getTable(tableName);
631 return table.incrementColumnValue(row, family, qualifier, amount);
632 } catch (IOException e) {
633 throw new IOError(e.getMessage());
634 }
635 }
636
637 public void scannerClose(int id) throws IOError, IllegalArgument {
638 LOG.debug("scannerClose: id=" + id);
639 ResultScanner scanner = getScanner(id);
640 if (scanner == null) {
641 throw new IllegalArgument("scanner ID is invalid");
642 }
643 scanner.close();
644 removeScanner(id);
645 }
646
647 public List<TRowResult> scannerGetList(int id,int nbRows) throws IllegalArgument, IOError {
648 LOG.debug("scannerGetList: id=" + id);
649 ResultScanner scanner = getScanner(id);
650 if (null == scanner) {
651 throw new IllegalArgument("scanner ID is invalid");
652 }
653
654 Result [] results = null;
655 try {
656 results = scanner.next(nbRows);
657 if (null == results) {
658 return new ArrayList<TRowResult>();
659 }
660 } catch (IOException e) {
661 throw new IOError(e.getMessage());
662 }
663 return ThriftUtilities.rowResultFromHBase(results);
664 }
665 public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
666 return scannerGetList(id,1);
667 }
668 public int scannerOpen(byte[] tableName, byte[] startRow,
669 List<byte[]> columns) throws IOError {
670 try {
671 HTable table = getTable(tableName);
672 Scan scan = new Scan(startRow);
673 if(columns != null && columns.size() != 0) {
674 for(byte [] column : columns) {
675 byte [][] famQf = KeyValue.parseColumn(column);
676 if(famQf.length == 1) {
677 scan.addFamily(famQf[0]);
678 } else {
679 scan.addColumn(famQf[0], famQf[1]);
680 }
681 }
682 }
683 return addScanner(table.getScanner(scan));
684 } catch (IOException e) {
685 throw new IOError(e.getMessage());
686 }
687 }
688
689 public int scannerOpenWithStop(byte[] tableName, byte[] startRow,
690 byte[] stopRow, List<byte[]> columns) throws IOError, TException {
691 try {
692 HTable table = getTable(tableName);
693 Scan scan = new Scan(startRow, stopRow);
694 if(columns != null && columns.size() != 0) {
695 for(byte [] column : columns) {
696 byte [][] famQf = KeyValue.parseColumn(column);
697 if(famQf.length == 1) {
698 scan.addFamily(famQf[0]);
699 } else {
700 scan.addColumn(famQf[0], famQf[1]);
701 }
702 }
703 }
704 return addScanner(table.getScanner(scan));
705 } catch (IOException e) {
706 throw new IOError(e.getMessage());
707 }
708 }
709
710 @Override
711 public int scannerOpenWithPrefix(byte[] tableName, byte[] startAndPrefix, List<byte[]> columns) throws IOError, TException {
712 try {
713 HTable table = getTable(tableName);
714 Scan scan = new Scan(startAndPrefix);
715 Filter f = new WhileMatchFilter(
716 new PrefixFilter(startAndPrefix));
717 scan.setFilter(f);
718 if(columns != null && columns.size() != 0) {
719 for(byte [] column : columns) {
720 byte [][] famQf = KeyValue.parseColumn(column);
721 if(famQf.length == 1) {
722 scan.addFamily(famQf[0]);
723 } else {
724 scan.addColumn(famQf[0], famQf[1]);
725 }
726 }
727 }
728 return addScanner(table.getScanner(scan));
729 } catch (IOException e) {
730 throw new IOError(e.getMessage());
731 }
732 }
733
734 public int scannerOpenTs(byte[] tableName, byte[] startRow,
735 List<byte[]> columns, long timestamp) throws IOError, TException {
736 try {
737 HTable table = getTable(tableName);
738 Scan scan = new Scan(startRow);
739 scan.setTimeRange(Long.MIN_VALUE, timestamp);
740 if(columns != null && columns.size() != 0) {
741 for(byte [] column : columns) {
742 byte [][] famQf = KeyValue.parseColumn(column);
743 if(famQf.length == 1) {
744 scan.addFamily(famQf[0]);
745 } else {
746 scan.addColumn(famQf[0], famQf[1]);
747 }
748 }
749 }
750 return addScanner(table.getScanner(scan));
751 } catch (IOException e) {
752 throw new IOError(e.getMessage());
753 }
754 }
755
756 public int scannerOpenWithStopTs(byte[] tableName, byte[] startRow,
757 byte[] stopRow, List<byte[]> columns, long timestamp)
758 throws IOError, TException {
759 try {
760 HTable table = getTable(tableName);
761 Scan scan = new Scan(startRow, stopRow);
762 scan.setTimeRange(Long.MIN_VALUE, timestamp);
763 if(columns != null && columns.size() != 0) {
764 for(byte [] column : columns) {
765 byte [][] famQf = KeyValue.parseColumn(column);
766 if(famQf.length == 1) {
767 scan.addFamily(famQf[0]);
768 } else {
769 scan.addColumn(famQf[0], famQf[1]);
770 }
771 }
772 }
773 scan.setTimeRange(Long.MIN_VALUE, timestamp);
774 return addScanner(table.getScanner(scan));
775 } catch (IOException e) {
776 throw new IOError(e.getMessage());
777 }
778 }
779
780 public Map<byte[], ColumnDescriptor> getColumnDescriptors(
781 byte[] tableName) throws IOError, TException {
782 try {
783 TreeMap<byte[], ColumnDescriptor> columns =
784 new TreeMap<byte[], ColumnDescriptor>(Bytes.BYTES_COMPARATOR);
785
786 HTable table = getTable(tableName);
787 HTableDescriptor desc = table.getTableDescriptor();
788
789 for (HColumnDescriptor e : desc.getFamilies()) {
790 ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
791 columns.put(col.name, col);
792 }
793 return columns;
794 } catch (IOException e) {
795 throw new IOError(e.getMessage());
796 }
797 }
798 }
799
800
801
802
803
804 private static void printUsageAndExit(Options options, int exitCode) {
805 HelpFormatter formatter = new HelpFormatter();
806 formatter.printHelp("Thrift", null, options,
807 "To start the Thrift server run 'bin/hbase-daemon.sh start thrift'\n" +
808 "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift' or" +
809 " send a kill signal to the thrift server pid",
810 true);
811 System.exit(exitCode);
812 }
813
814 private static final String DEFAULT_LISTEN_PORT = "9090";
815
816
817
818
819
820 static private void doMain(final String[] args) throws Exception {
821 Log LOG = LogFactory.getLog("ThriftServer");
822
823 Options options = new Options();
824 options.addOption("b", "bind", true, "Address to bind the Thrift server to. Not supported by the Nonblocking and HsHa server [default: 0.0.0.0]");
825 options.addOption("p", "port", true, "Port to bind to [default: 9090]");
826 options.addOption("f", "framed", false, "Use framed transport");
827 options.addOption("c", "compact", false, "Use the compact protocol");
828 options.addOption("h", "help", false, "Print help information");
829
830 OptionGroup servers = new OptionGroup();
831 servers.addOption(new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
832 servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
833 servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
834 options.addOptionGroup(servers);
835
836 CommandLineParser parser = new PosixParser();
837 CommandLine cmd = parser.parse(options, args);
838
839
840
841
842
843
844 List<String> commandLine = Arrays.asList(args);
845 boolean stop = commandLine.contains("stop");
846 boolean start = commandLine.contains("start");
847 if (cmd.hasOption("help") || !start || stop) {
848 printUsageAndExit(options, 1);
849 }
850
851
852 int listenPort = 0;
853 try {
854 listenPort = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_LISTEN_PORT));
855 } catch (NumberFormatException e) {
856 LOG.error("Could not parse the value provided for the port option", e);
857 printUsageAndExit(options, -1);
858 }
859
860
861 TProtocolFactory protocolFactory;
862 if (cmd.hasOption("compact")) {
863 LOG.debug("Using compact protocol");
864 protocolFactory = new TCompactProtocol.Factory();
865 } else {
866 LOG.debug("Using binary protocol");
867 protocolFactory = new TBinaryProtocol.Factory();
868 }
869
870 HBaseHandler handler = new HBaseHandler();
871 Hbase.Processor processor = new Hbase.Processor(handler);
872
873 TServer server;
874 if (cmd.hasOption("nonblocking") || cmd.hasOption("hsha")) {
875 if (cmd.hasOption("bind")) {
876 LOG.error("The Nonblocking and HsHa servers don't support IP address binding at the moment." +
877 " See https://issues.apache.org/jira/browse/HBASE-2155 for details.");
878 printUsageAndExit(options, -1);
879 }
880
881 TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(listenPort);
882 TFramedTransport.Factory transportFactory = new TFramedTransport.Factory();
883
884 if (cmd.hasOption("nonblocking")) {
885 LOG.info("starting HBase Nonblocking Thrift server on " + Integer.toString(listenPort));
886 server = new TNonblockingServer(processor, serverTransport, transportFactory, protocolFactory);
887 } else {
888 LOG.info("starting HBase HsHA Thrift server on " + Integer.toString(listenPort));
889 server = new THsHaServer(processor, serverTransport, transportFactory, protocolFactory);
890 }
891 } else {
892
893 InetAddress listenAddress = null;
894 if (cmd.hasOption("bind")) {
895 try {
896 listenAddress = InetAddress.getByName(cmd.getOptionValue("bind"));
897 } catch (UnknownHostException e) {
898 LOG.error("Could not bind to provided ip address", e);
899 printUsageAndExit(options, -1);
900 }
901 } else {
902 listenAddress = InetAddress.getByName("0.0.0.0");
903 }
904 TServerTransport serverTransport = new TServerSocket(new InetSocketAddress(listenAddress, listenPort));
905
906
907 TTransportFactory transportFactory;
908 if (cmd.hasOption("framed")) {
909 transportFactory = new TFramedTransport.Factory();
910 LOG.debug("Using framed transport");
911 } else {
912 transportFactory = new TTransportFactory();
913 }
914
915 LOG.info("starting HBase ThreadPool Thrift server on " + listenAddress + ":" + Integer.toString(listenPort));
916 server = new TThreadPoolServer(processor, serverTransport, transportFactory, protocolFactory);
917 }
918
919 server.serve();
920 }
921
922
923
924
925
926 public static void main(String [] args) throws Exception {
927 doMain(args);
928 }
929 }