View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.thrift;
20  
21  import java.io.IOException;
22  import java.net.InetAddress;
23  import java.net.InetSocketAddress;
24  import java.net.UnknownHostException;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.HashMap;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.TreeMap;
31  
32  import org.apache.commons.cli.CommandLine;
33  import org.apache.commons.cli.CommandLineParser;
34  import org.apache.commons.cli.HelpFormatter;
35  import org.apache.commons.cli.Option;
36  import org.apache.commons.cli.OptionGroup;
37  import org.apache.commons.cli.Options;
38  import org.apache.commons.cli.PosixParser;
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.hbase.HBaseConfiguration;
43  import org.apache.hadoop.hbase.HColumnDescriptor;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.HServerAddress;
47  import org.apache.hadoop.hbase.HTableDescriptor;
48  import org.apache.hadoop.hbase.KeyValue;
49  import org.apache.hadoop.hbase.client.Delete;
50  import org.apache.hadoop.hbase.client.Get;
51  import org.apache.hadoop.hbase.client.HBaseAdmin;
52  import org.apache.hadoop.hbase.client.HTable;
53  import org.apache.hadoop.hbase.client.Put;
54  import org.apache.hadoop.hbase.client.Result;
55  import org.apache.hadoop.hbase.client.ResultScanner;
56  import org.apache.hadoop.hbase.client.Scan;
57  import org.apache.hadoop.hbase.filter.Filter;
58  import org.apache.hadoop.hbase.filter.PrefixFilter;
59  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
60  import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
61  import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
62  import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
63  import org.apache.hadoop.hbase.thrift.generated.Hbase;
64  import org.apache.hadoop.hbase.thrift.generated.IOError;
65  import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
66  import org.apache.hadoop.hbase.thrift.generated.Mutation;
67  import org.apache.hadoop.hbase.thrift.generated.TCell;
68  import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
69  import org.apache.hadoop.hbase.thrift.generated.TRowResult;
70  import org.apache.hadoop.hbase.util.Bytes;
71  import org.apache.thrift.TException;
72  import org.apache.thrift.protocol.TBinaryProtocol;
73  import org.apache.thrift.protocol.TCompactProtocol;
74  import org.apache.thrift.protocol.TProtocolFactory;
75  import org.apache.thrift.server.THsHaServer;
76  import org.apache.thrift.server.TNonblockingServer;
77  import org.apache.thrift.server.TServer;
78  import org.apache.thrift.server.TThreadPoolServer;
79  import org.apache.thrift.transport.TFramedTransport;
80  import org.apache.thrift.transport.TNonblockingServerSocket;
81  import org.apache.thrift.transport.TNonblockingServerTransport;
82  import org.apache.thrift.transport.TServerSocket;
83  import org.apache.thrift.transport.TServerTransport;
84  import org.apache.thrift.transport.TTransportFactory;
85  
86  /**
87   * ThriftServer - this class starts up a Thrift server which implements the
88   * Hbase API specified in the Hbase.thrift IDL file.
89   */
90  public class ThriftServer {
91  
92    /**
93     * The HBaseHandler is a glue object that connects Thrift RPC calls to the
94     * HBase client API primarily defined in the HBaseAdmin and HTable objects.
95     */
96    public static class HBaseHandler implements Hbase.Iface {
97      protected Configuration conf;
98      protected HBaseAdmin admin = null;
99      protected final Log LOG = LogFactory.getLog(this.getClass().getName());
100 
101     // nextScannerId and scannerMap are used to manage scanner state
102     protected int nextScannerId = 0;
103     protected HashMap<Integer, ResultScanner> scannerMap = null;
104 
105     private static ThreadLocal<Map<String, HTable>> threadLocalTables = new ThreadLocal<Map<String, HTable>>() {
106       @Override
107       protected Map<String, HTable> initialValue() {
108         return new TreeMap<String, HTable>();
109       }
110     };
111 
112     /**
113      * Returns a list of all the column families for a given htable.
114      *
115      * @param table
116      * @return
117      * @throws IOException
118      */
119     byte[][] getAllColumns(HTable table) throws IOException {
120       HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
121       byte[][] columns = new byte[cds.length][];
122       for (int i = 0; i < cds.length; i++) {
123         columns[i] = Bytes.add(cds[i].getName(),
124             KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
125       }
126       return columns;
127     }
128 
129     /**
130      * Creates and returns an HTable instance from a given table name.
131      *
132      * @param tableName
133      *          name of table
134      * @return HTable object
135      * @throws IOException
136      * @throws IOError
137      */
138     protected HTable getTable(final byte[] tableName) throws IOError,
139         IOException {
140       String table = new String(tableName);
141       Map<String, HTable> tables = threadLocalTables.get();
142       if (!tables.containsKey(table)) {
143         tables.put(table, new HTable(conf, tableName));
144       }
145       return tables.get(table);
146     }
147 
148     /**
149      * Assigns a unique ID to the scanner and adds the mapping to an internal
150      * hash-map.
151      *
152      * @param scanner
153      * @return integer scanner id
154      */
155     protected synchronized int addScanner(ResultScanner scanner) {
156       int id = nextScannerId++;
157       scannerMap.put(id, scanner);
158       return id;
159     }
160 
161     /**
162      * Returns the scanner associated with the specified ID.
163      *
164      * @param id
165      * @return a Scanner, or null if ID was invalid.
166      */
167     protected synchronized ResultScanner getScanner(int id) {
168       return scannerMap.get(id);
169     }
170 
171     /**
172      * Removes the scanner associated with the specified ID from the internal
173      * id->scanner hash-map.
174      *
175      * @param id
176      * @return a Scanner, or null if ID was invalid.
177      */
178     protected synchronized ResultScanner removeScanner(int id) {
179       return scannerMap.remove(id);
180     }
181 
182     /**
183      * Constructs an HBaseHandler object.
184      * @throws IOException 
185      */
186     HBaseHandler()
187     throws IOException {
188       this(HBaseConfiguration.create());
189     }
190 
191     HBaseHandler(final Configuration c)
192     throws IOException {
193       this.conf = c;
194       admin = new HBaseAdmin(conf);
195       scannerMap = new HashMap<Integer, ResultScanner>();
196     }
197 
198     public void enableTable(final byte[] tableName) throws IOError {
199       try{
200         admin.enableTable(tableName);
201       } catch (IOException e) {
202         throw new IOError(e.getMessage());
203       }
204     }
205 
206     public void disableTable(final byte[] tableName) throws IOError{
207       try{
208         admin.disableTable(tableName);
209       } catch (IOException e) {
210         throw new IOError(e.getMessage());
211       }
212     }
213 
214     public boolean isTableEnabled(final byte[] tableName) throws IOError {
215       try {
216         return HTable.isTableEnabled(this.conf, tableName);
217       } catch (IOException e) {
218         throw new IOError(e.getMessage());
219       }
220     }
221 
222     public void compact(byte[] tableNameOrRegionName) throws IOError {
223       try{
224         admin.compact(tableNameOrRegionName);
225       } catch (InterruptedException e) {
226         throw new IOError(e.getMessage());
227       } catch (IOException e) {
228         throw new IOError(e.getMessage());
229       }
230     }
231 
232     public void majorCompact(byte[] tableNameOrRegionName) throws IOError {
233       try{
234         admin.majorCompact(tableNameOrRegionName);
235       } catch (InterruptedException e) {
236         throw new IOError(e.getMessage());
237       } catch (IOException e) {
238         throw new IOError(e.getMessage());
239       }
240     }
241 
242     public List<byte[]> getTableNames() throws IOError {
243       try {
244         HTableDescriptor[] tables = this.admin.listTables();
245         ArrayList<byte[]> list = new ArrayList<byte[]>(tables.length);
246         for (int i = 0; i < tables.length; i++) {
247           list.add(tables[i].getName());
248         }
249         return list;
250       } catch (IOException e) {
251         throw new IOError(e.getMessage());
252       }
253     }
254 
255     public List<TRegionInfo> getTableRegions(byte[] tableName)
256     throws IOError {
257       try{
258         HTable table = getTable(tableName);
259         Map<HRegionInfo, HServerAddress> regionsInfo = table.getRegionsInfo();
260         List<TRegionInfo> regions = new ArrayList<TRegionInfo>();
261 
262         for (HRegionInfo regionInfo : regionsInfo.keySet()){
263           TRegionInfo region = new TRegionInfo();
264           region.startKey = regionInfo.getStartKey();
265           region.endKey = regionInfo.getEndKey();
266           region.id = regionInfo.getRegionId();
267           region.name = regionInfo.getRegionName();
268           region.version = regionInfo.getVersion();
269           regions.add(region);
270         }
271         return regions;
272       } catch (IOException e){
273         throw new IOError(e.getMessage());
274       }
275     }
276 
277     @Deprecated
278     public List<TCell> get(byte[] tableName, byte[] row, byte[] column)
279         throws IOError {
280       byte [][] famAndQf = KeyValue.parseColumn(column);
281       if(famAndQf.length == 1) {
282         return get(tableName, row, famAndQf[0], new byte[0]);
283       }
284       return get(tableName, row, famAndQf[0], famAndQf[1]);
285     }
286 
287     public List<TCell> get(byte [] tableName, byte [] row, byte [] family,
288         byte [] qualifier) throws IOError {
289       try {
290         HTable table = getTable(tableName);
291         Get get = new Get(row);
292         if (qualifier == null || qualifier.length == 0) {
293           get.addFamily(family);
294         } else {
295           get.addColumn(family, qualifier);
296         }
297         Result result = table.get(get);
298         return ThriftUtilities.cellFromHBase(result.sorted());
299       } catch (IOException e) {
300         throw new IOError(e.getMessage());
301       }
302     }
303 
304     @Deprecated
305     public List<TCell> getVer(byte[] tableName, byte[] row,
306         byte[] column, int numVersions) throws IOError {
307       byte [][] famAndQf = KeyValue.parseColumn(column);
308       if(famAndQf.length == 1) {
309         return getVer(tableName, row, famAndQf[0], new byte[0], numVersions);
310       }
311       return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions);
312     }
313 
314     public List<TCell> getVer(byte [] tableName, byte [] row, byte [] family,
315         byte [] qualifier, int numVersions) throws IOError {
316       try {
317         HTable table = getTable(tableName);
318         Get get = new Get(row);
319         get.addColumn(family, qualifier);
320         get.setMaxVersions(numVersions);
321         Result result = table.get(get);
322         return ThriftUtilities.cellFromHBase(result.sorted());
323       } catch (IOException e) {
324         throw new IOError(e.getMessage());
325       }
326     }
327 
328     @Deprecated
329     public List<TCell> getVerTs(byte[] tableName, byte[] row,
330         byte[] column, long timestamp, int numVersions) throws IOError {
331       byte [][] famAndQf = KeyValue.parseColumn(column);
332       if(famAndQf.length == 1) {
333         return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp,
334             numVersions);
335       }
336       return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp,
337           numVersions);
338     }
339 
340     public List<TCell> getVerTs(byte [] tableName, byte [] row, byte [] family,
341         byte [] qualifier, long timestamp, int numVersions) throws IOError {
342       try {
343         HTable table = getTable(tableName);
344         Get get = new Get(row);
345         get.addColumn(family, qualifier);
346         get.setTimeRange(Long.MIN_VALUE, timestamp);
347         get.setMaxVersions(numVersions);
348         Result result = table.get(get);
349         return ThriftUtilities.cellFromHBase(result.sorted());
350       } catch (IOException e) {
351         throw new IOError(e.getMessage());
352       }
353     }
354 
355     public List<TRowResult> getRow(byte[] tableName, byte[] row)
356         throws IOError {
357       return getRowWithColumnsTs(tableName, row, null,
358                                  HConstants.LATEST_TIMESTAMP);
359     }
360 
361     public List<TRowResult> getRowWithColumns(byte[] tableName, byte[] row,
362         List<byte[]> columns) throws IOError {
363       return getRowWithColumnsTs(tableName, row, columns,
364                                  HConstants.LATEST_TIMESTAMP);
365     }
366 
367     public List<TRowResult> getRowTs(byte[] tableName, byte[] row,
368         long timestamp) throws IOError {
369       return getRowWithColumnsTs(tableName, row, null,
370                                  timestamp);
371     }
372 
373     public List<TRowResult> getRowWithColumnsTs(byte[] tableName, byte[] row,
374         List<byte[]> columns, long timestamp) throws IOError {
375       try {
376         HTable table = getTable(tableName);
377         if (columns == null) {
378           Get get = new Get(row);
379           get.setTimeRange(Long.MIN_VALUE, timestamp);
380           Result result = table.get(get);
381           return ThriftUtilities.rowResultFromHBase(result);
382         }
383         byte[][] columnArr = columns.toArray(new byte[columns.size()][]);
384         Get get = new Get(row);
385         for(byte [] column : columnArr) {
386           byte [][] famAndQf = KeyValue.parseColumn(column);
387           if (famAndQf.length == 1) {
388               get.addFamily(famAndQf[0]);
389           } else {
390               get.addColumn(famAndQf[0], famAndQf[1]);
391           }
392         }
393         get.setTimeRange(Long.MIN_VALUE, timestamp);
394         Result result = table.get(get);
395         return ThriftUtilities.rowResultFromHBase(result);
396       } catch (IOException e) {
397         throw new IOError(e.getMessage());
398       }
399     }
400 
401     public List<TRowResult> getRows(byte[] tableName, List<byte[]> rows)
402         throws IOError {
403       return getRowsWithColumnsTs(tableName, rows, null,
404                                   HConstants.LATEST_TIMESTAMP);
405     }
406 
407     public List<TRowResult> getRowsWithColumns(byte[] tableName, List<byte[]> rows,
408         List<byte[]> columns) throws IOError {
409       return getRowsWithColumnsTs(tableName, rows, columns,
410                                   HConstants.LATEST_TIMESTAMP);
411     }
412 
413     public List<TRowResult> getRowsTs(byte[] tableName, List<byte[]> rows,
414         long timestamp) throws IOError {
415       return getRowsWithColumnsTs(tableName, rows, null,
416                                   timestamp);
417     }
418 
419     public List<TRowResult> getRowsWithColumnsTs(byte[] tableName, List<byte[]> rows,
420         List<byte[]> columns, long timestamp) throws IOError {
421       try {
422         List<Get> gets = new ArrayList<Get>(rows.size());
423         HTable table = getTable(tableName);
424         for (byte[] row : rows) {
425           Get get = new Get(row);
426           if (columns != null) {
427             byte[][] columnArr = columns.toArray(new byte[columns.size()][]);
428             for(byte [] column : columnArr) {
429               byte [][] famAndQf = KeyValue.parseColumn(column);
430               if (famAndQf.length == 1) {
431                 get.addFamily(famAndQf[0]);
432               } else {
433                 get.addColumn(famAndQf[0], famAndQf[1]);
434               }
435             }
436             get.setTimeRange(Long.MIN_VALUE, timestamp);
437           }
438           gets.add(get);
439         }
440         Result[] result = table.get(gets);
441         return ThriftUtilities.rowResultFromHBase(result);
442       } catch (IOException e) {
443         throw new IOError(e.getMessage());
444       }
445     }
446 
447     public void deleteAll(byte[] tableName, byte[] row, byte[] column)
448         throws IOError {
449       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP);
450     }
451 
452     public void deleteAllTs(byte[] tableName, byte[] row, byte[] column,
453         long timestamp) throws IOError {
454       try {
455         HTable table = getTable(tableName);
456         Delete delete  = new Delete(row);
457         byte [][] famAndQf = KeyValue.parseColumn(column);
458         if (famAndQf.length == 1) {
459           delete.deleteFamily(famAndQf[0], timestamp);
460         } else {
461           delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
462         }
463         table.delete(delete);
464 
465       } catch (IOException e) {
466         throw new IOError(e.getMessage());
467       }
468     }
469 
470     public void deleteAllRow(byte[] tableName, byte[] row) throws IOError {
471       deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP);
472     }
473 
474     public void deleteAllRowTs(byte[] tableName, byte[] row, long timestamp)
475         throws IOError {
476       try {
477         HTable table = getTable(tableName);
478         Delete delete  = new Delete(row, timestamp, null);
479         table.delete(delete);
480       } catch (IOException e) {
481         throw new IOError(e.getMessage());
482       }
483     }
484 
485     public void createTable(byte[] tableName,
486         List<ColumnDescriptor> columnFamilies) throws IOError,
487         IllegalArgument, AlreadyExists {
488       try {
489         if (admin.tableExists(tableName)) {
490           throw new AlreadyExists("table name already in use");
491         }
492         HTableDescriptor desc = new HTableDescriptor(tableName);
493         for (ColumnDescriptor col : columnFamilies) {
494           HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
495           desc.addFamily(colDesc);
496         }
497         admin.createTable(desc);
498       } catch (IOException e) {
499         throw new IOError(e.getMessage());
500       } catch (IllegalArgumentException e) {
501         throw new IllegalArgument(e.getMessage());
502       }
503     }
504 
505     public void deleteTable(byte[] tableName) throws IOError {
506       if (LOG.isDebugEnabled()) {
507         LOG.debug("deleteTable: table=" + new String(tableName));
508       }
509       try {
510         if (!admin.tableExists(tableName)) {
511           throw new IOError("table does not exist");
512         }
513         admin.deleteTable(tableName);
514       } catch (IOException e) {
515         throw new IOError(e.getMessage());
516       }
517     }
518 
519     public void mutateRow(byte[] tableName, byte[] row,
520         List<Mutation> mutations) throws IOError, IllegalArgument {
521       mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP);
522     }
523 
524     public void mutateRowTs(byte[] tableName, byte[] row,
525         List<Mutation> mutations, long timestamp) throws IOError, IllegalArgument {
526       HTable table = null;
527       try {
528         table = getTable(tableName);
529         Put put = new Put(row, timestamp, null);
530 
531         Delete delete = new Delete(row);
532 
533         // I apologize for all this mess :)
534         for (Mutation m : mutations) {
535           byte[][] famAndQf = KeyValue.parseColumn(m.column);
536           if (m.isDelete) {
537             if (famAndQf.length == 1) {
538               delete.deleteFamily(famAndQf[0], timestamp);
539             } else {
540               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
541             }
542           } else {
543             if(famAndQf.length == 1) {
544               put.add(famAndQf[0], new byte[0], m.value);
545             } else {
546               put.add(famAndQf[0], famAndQf[1], m.value);
547             }
548           }
549         }
550         if (!delete.isEmpty())
551           table.delete(delete);
552         if (!put.isEmpty())
553           table.put(put);
554       } catch (IOException e) {
555         throw new IOError(e.getMessage());
556       } catch (IllegalArgumentException e) {
557         throw new IllegalArgument(e.getMessage());
558       }
559     }
560 
561     public void mutateRows(byte[] tableName, List<BatchMutation> rowBatches)
562         throws IOError, IllegalArgument, TException {
563       mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP);
564     }
565 
566     public void mutateRowsTs(byte[] tableName, List<BatchMutation> rowBatches, long timestamp)
567         throws IOError, IllegalArgument, TException {
568       List<Put> puts = new ArrayList<Put>();
569       List<Delete> deletes = new ArrayList<Delete>();
570 
571       for (BatchMutation batch : rowBatches) {
572         byte[] row = batch.row;
573         List<Mutation> mutations = batch.mutations;
574         Delete delete = new Delete(row);
575         Put put = new Put(row, timestamp, null);
576         for (Mutation m : mutations) {
577           byte[][] famAndQf = KeyValue.parseColumn(m.column);
578           if (m.isDelete) {
579             // no qualifier, family only.
580             if (famAndQf.length == 1) {
581               delete.deleteFamily(famAndQf[0], timestamp);
582             } else {
583               delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
584             }
585           } else {
586             if(famAndQf.length == 1) {
587               put.add(famAndQf[0], new byte[0], m.value);
588             } else {
589               put.add(famAndQf[0], famAndQf[1], m.value);
590             }
591           }
592         }
593         if (!delete.isEmpty())
594           deletes.add(delete);
595         if (!put.isEmpty())
596           puts.add(put);
597       }
598 
599       HTable table = null;
600       try {
601         table = getTable(tableName);
602         if (!puts.isEmpty())
603           table.put(puts);
604         for (Delete del : deletes) {
605           table.delete(del);
606         }
607       } catch (IOException e) {
608         throw new IOError(e.getMessage());
609       } catch (IllegalArgumentException e) {
610         throw new IllegalArgument(e.getMessage());
611       }
612     }
613 
614     @Deprecated
615     public long atomicIncrement(byte[] tableName, byte[] row, byte[] column,
616         long amount) throws IOError, IllegalArgument, TException {
617       byte [][] famAndQf = KeyValue.parseColumn(column);
618       if(famAndQf.length == 1) {
619         return atomicIncrement(tableName, row, famAndQf[0], new byte[0],
620             amount);
621       }
622       return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
623     }
624 
625     public long atomicIncrement(byte [] tableName, byte [] row, byte [] family,
626         byte [] qualifier, long amount)
627     throws IOError, IllegalArgument, TException {
628       HTable table;
629       try {
630         table = getTable(tableName);
631         return table.incrementColumnValue(row, family, qualifier, amount);
632       } catch (IOException e) {
633         throw new IOError(e.getMessage());
634       }
635     }
636 
637     public void scannerClose(int id) throws IOError, IllegalArgument {
638       LOG.debug("scannerClose: id=" + id);
639       ResultScanner scanner = getScanner(id);
640       if (scanner == null) {
641         throw new IllegalArgument("scanner ID is invalid");
642       }
643       scanner.close();
644       removeScanner(id);
645     }
646 
647     public List<TRowResult> scannerGetList(int id,int nbRows) throws IllegalArgument, IOError {
648         LOG.debug("scannerGetList: id=" + id);
649         ResultScanner scanner = getScanner(id);
650         if (null == scanner) {
651             throw new IllegalArgument("scanner ID is invalid");
652         }
653 
654         Result [] results = null;
655         try {
656             results = scanner.next(nbRows);
657             if (null == results) {
658                 return new ArrayList<TRowResult>();
659             }
660         } catch (IOException e) {
661             throw new IOError(e.getMessage());
662         }
663         return ThriftUtilities.rowResultFromHBase(results);
664     }
665     public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
666         return scannerGetList(id,1);
667     }
668     public int scannerOpen(byte[] tableName, byte[] startRow,
669             List<byte[]> columns) throws IOError {
670         try {
671           HTable table = getTable(tableName);
672           Scan scan = new Scan(startRow);
673           if(columns != null && columns.size() != 0) {
674             for(byte [] column : columns) {
675               byte [][] famQf = KeyValue.parseColumn(column);
676               if(famQf.length == 1) {
677                 scan.addFamily(famQf[0]);
678               } else {
679                 scan.addColumn(famQf[0], famQf[1]);
680               }
681             }
682           }
683           return addScanner(table.getScanner(scan));
684         } catch (IOException e) {
685           throw new IOError(e.getMessage());
686         }
687     }
688 
689     public int scannerOpenWithStop(byte[] tableName, byte[] startRow,
690         byte[] stopRow, List<byte[]> columns) throws IOError, TException {
691       try {
692         HTable table = getTable(tableName);
693         Scan scan = new Scan(startRow, stopRow);
694         if(columns != null && columns.size() != 0) {
695           for(byte [] column : columns) {
696             byte [][] famQf = KeyValue.parseColumn(column);
697             if(famQf.length == 1) {
698               scan.addFamily(famQf[0]);
699             } else {
700               scan.addColumn(famQf[0], famQf[1]);
701             }
702           }
703         }
704         return addScanner(table.getScanner(scan));
705       } catch (IOException e) {
706         throw new IOError(e.getMessage());
707       }
708     }
709 
710     @Override
711     public int scannerOpenWithPrefix(byte[] tableName, byte[] startAndPrefix, List<byte[]> columns) throws IOError, TException {
712       try {
713         HTable table = getTable(tableName);
714         Scan scan = new Scan(startAndPrefix);
715         Filter f = new WhileMatchFilter(
716             new PrefixFilter(startAndPrefix));
717         scan.setFilter(f);
718         if(columns != null && columns.size() != 0) {
719           for(byte [] column : columns) {
720             byte [][] famQf = KeyValue.parseColumn(column);
721             if(famQf.length == 1) {
722               scan.addFamily(famQf[0]);
723             } else {
724               scan.addColumn(famQf[0], famQf[1]);
725             }
726           }
727         }
728         return addScanner(table.getScanner(scan));
729       } catch (IOException e) {
730         throw new IOError(e.getMessage());
731       }
732     }
733 
734     public int scannerOpenTs(byte[] tableName, byte[] startRow,
735         List<byte[]> columns, long timestamp) throws IOError, TException {
736       try {
737         HTable table = getTable(tableName);
738         Scan scan = new Scan(startRow);
739         scan.setTimeRange(Long.MIN_VALUE, timestamp);
740         if(columns != null && columns.size() != 0) {
741           for(byte [] column : columns) {
742             byte [][] famQf = KeyValue.parseColumn(column);
743             if(famQf.length == 1) {
744               scan.addFamily(famQf[0]);
745             } else {
746               scan.addColumn(famQf[0], famQf[1]);
747             }
748           }
749         }
750         return addScanner(table.getScanner(scan));
751       } catch (IOException e) {
752         throw new IOError(e.getMessage());
753       }
754     }
755 
756     public int scannerOpenWithStopTs(byte[] tableName, byte[] startRow,
757         byte[] stopRow, List<byte[]> columns, long timestamp)
758         throws IOError, TException {
759       try {
760         HTable table = getTable(tableName);
761         Scan scan = new Scan(startRow, stopRow);
762         scan.setTimeRange(Long.MIN_VALUE, timestamp);
763         if(columns != null && columns.size() != 0) {
764           for(byte [] column : columns) {
765             byte [][] famQf = KeyValue.parseColumn(column);
766             if(famQf.length == 1) {
767               scan.addFamily(famQf[0]);
768             } else {
769               scan.addColumn(famQf[0], famQf[1]);
770             }
771           }
772         }
773         scan.setTimeRange(Long.MIN_VALUE, timestamp);
774         return addScanner(table.getScanner(scan));
775       } catch (IOException e) {
776         throw new IOError(e.getMessage());
777       }
778     }
779 
780     public Map<byte[], ColumnDescriptor> getColumnDescriptors(
781         byte[] tableName) throws IOError, TException {
782       try {
783         TreeMap<byte[], ColumnDescriptor> columns =
784           new TreeMap<byte[], ColumnDescriptor>(Bytes.BYTES_COMPARATOR);
785 
786         HTable table = getTable(tableName);
787         HTableDescriptor desc = table.getTableDescriptor();
788 
789         for (HColumnDescriptor e : desc.getFamilies()) {
790           ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
791           columns.put(col.name, col);
792         }
793         return columns;
794       } catch (IOException e) {
795         throw new IOError(e.getMessage());
796       }
797     }
798   }
799 
800   //
801   // Main program and support routines
802   //
803 
804   private static void printUsageAndExit(Options options, int exitCode) {
805     HelpFormatter formatter = new HelpFormatter();
806     formatter.printHelp("Thrift", null, options,
807             "To start the Thrift server run 'bin/hbase-daemon.sh start thrift'\n" +
808             "To shutdown the thrift server run 'bin/hbase-daemon.sh stop thrift' or" +
809             " send a kill signal to the thrift server pid",
810             true);
811       System.exit(exitCode);
812   }
813 
814   private static final String DEFAULT_LISTEN_PORT = "9090";
815 
816   /*
817    * Start up the Thrift server.
818    * @param args
819    */
820   static private void doMain(final String[] args) throws Exception {
821     Log LOG = LogFactory.getLog("ThriftServer");
822 
823     Options options = new Options();
824     options.addOption("b", "bind", true, "Address to bind the Thrift server to. Not supported by the Nonblocking and HsHa server [default: 0.0.0.0]");
825     options.addOption("p", "port", true, "Port to bind to [default: 9090]");
826     options.addOption("f", "framed", false, "Use framed transport");
827     options.addOption("c", "compact", false, "Use the compact protocol");
828     options.addOption("h", "help", false, "Print help information");
829 
830     OptionGroup servers = new OptionGroup();
831     servers.addOption(new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
832     servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
833     servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
834     options.addOptionGroup(servers);
835 
836     CommandLineParser parser = new PosixParser();
837     CommandLine cmd = parser.parse(options, args);
838 
839     /**
840      * This is so complicated to please both bin/hbase and bin/hbase-daemon.
841      * hbase-daemon provides "start" and "stop" arguments
842      * hbase should print the help if no argument is provided
843      */
844     List<String> commandLine = Arrays.asList(args);
845     boolean stop = commandLine.contains("stop");
846     boolean start = commandLine.contains("start");
847     if (cmd.hasOption("help") || !start || stop) {
848       printUsageAndExit(options, 1);
849     }
850 
851     // Get port to bind to
852     int listenPort = 0;
853     try {
854       listenPort = Integer.parseInt(cmd.getOptionValue("port", DEFAULT_LISTEN_PORT));
855     } catch (NumberFormatException e) {
856       LOG.error("Could not parse the value provided for the port option", e);
857       printUsageAndExit(options, -1);
858     }
859 
860     // Construct correct ProtocolFactory
861     TProtocolFactory protocolFactory;
862     if (cmd.hasOption("compact")) {
863       LOG.debug("Using compact protocol");
864       protocolFactory = new TCompactProtocol.Factory();
865     } else {
866       LOG.debug("Using binary protocol");
867       protocolFactory = new TBinaryProtocol.Factory();
868     }
869 
870     HBaseHandler handler = new HBaseHandler();
871     Hbase.Processor processor = new Hbase.Processor(handler);
872 
873     TServer server;
874     if (cmd.hasOption("nonblocking") || cmd.hasOption("hsha")) {
875       if (cmd.hasOption("bind")) {
876         LOG.error("The Nonblocking and HsHa servers don't support IP address binding at the moment." +
877                 " See https://issues.apache.org/jira/browse/HBASE-2155 for details.");
878         printUsageAndExit(options, -1);
879       }
880 
881       TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(listenPort);
882       TFramedTransport.Factory transportFactory = new TFramedTransport.Factory();
883 
884       if (cmd.hasOption("nonblocking")) {
885         LOG.info("starting HBase Nonblocking Thrift server on " + Integer.toString(listenPort));
886         server = new TNonblockingServer(processor, serverTransport, transportFactory, protocolFactory);
887       } else {
888         LOG.info("starting HBase HsHA Thrift server on " + Integer.toString(listenPort));
889         server = new THsHaServer(processor, serverTransport, transportFactory, protocolFactory);
890       }
891     } else {
892       // Get IP address to bind to
893       InetAddress listenAddress = null;
894       if (cmd.hasOption("bind")) {
895         try {
896           listenAddress = InetAddress.getByName(cmd.getOptionValue("bind"));
897         } catch (UnknownHostException e) {
898           LOG.error("Could not bind to provided ip address", e);
899           printUsageAndExit(options, -1);
900         }
901       } else {
902         listenAddress = InetAddress.getByName("0.0.0.0");
903       }
904       TServerTransport serverTransport = new TServerSocket(new InetSocketAddress(listenAddress, listenPort));
905 
906       // Construct correct TransportFactory
907       TTransportFactory transportFactory;
908       if (cmd.hasOption("framed")) {
909         transportFactory = new TFramedTransport.Factory();
910         LOG.debug("Using framed transport");
911       } else {
912         transportFactory = new TTransportFactory();
913       }
914 
915       LOG.info("starting HBase ThreadPool Thrift server on " + listenAddress + ":" + Integer.toString(listenPort));
916       server = new TThreadPoolServer(processor, serverTransport, transportFactory, protocolFactory);
917     }
918 
919     server.serve();
920   }
921 
922   /**
923    * @param args
924    * @throws Exception
925    */
926   public static void main(String [] args) throws Exception {
927     doMain(args);
928   }
929 }