View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.avro;
20  
21  import java.io.IOException;
22  import java.nio.ByteBuffer;
23  import java.util.HashMap;
24  
25  import org.apache.avro.Schema;
26  import org.apache.avro.generic.GenericArray;
27  import org.apache.avro.generic.GenericData;
28  import org.apache.avro.ipc.HttpServer;
29  import org.apache.avro.specific.SpecificResponder;
30  import org.apache.avro.util.Utf8;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.hbase.HBaseConfiguration;
35  import org.apache.hadoop.hbase.HTableDescriptor;
36  import org.apache.hadoop.hbase.MasterNotRunningException;
37  import org.apache.hadoop.hbase.TableExistsException;
38  import org.apache.hadoop.hbase.avro.generated.AClusterStatus;
39  import org.apache.hadoop.hbase.avro.generated.ADelete;
40  import org.apache.hadoop.hbase.avro.generated.AFamilyDescriptor;
41  import org.apache.hadoop.hbase.avro.generated.AGet;
42  import org.apache.hadoop.hbase.avro.generated.AIOError;
43  import org.apache.hadoop.hbase.avro.generated.AIllegalArgument;
44  import org.apache.hadoop.hbase.avro.generated.AMasterNotRunning;
45  import org.apache.hadoop.hbase.avro.generated.APut;
46  import org.apache.hadoop.hbase.avro.generated.AResult;
47  import org.apache.hadoop.hbase.avro.generated.AScan;
48  import org.apache.hadoop.hbase.avro.generated.ATableDescriptor;
49  import org.apache.hadoop.hbase.avro.generated.ATableExists;
50  import org.apache.hadoop.hbase.avro.generated.HBase;
51  import org.apache.hadoop.hbase.client.HBaseAdmin;
52  import org.apache.hadoop.hbase.client.HTableInterface;
53  import org.apache.hadoop.hbase.client.HTablePool;
54  import org.apache.hadoop.hbase.client.ResultScanner;
55  import org.apache.hadoop.hbase.client.Scan;
56  import org.apache.hadoop.hbase.util.Bytes;
57  
58  /**
59   * Start an Avro server
60   */
61  public class AvroServer {
62  
63    /**
64     * The HBaseImpl is a glue object that connects Avro RPC calls to the
65     * HBase client API primarily defined in the HBaseAdmin and HTable objects.
66     */
67    public static class HBaseImpl implements HBase {
68      //
69      // PROPERTIES
70      //
71      protected Configuration conf = null;
72      protected HBaseAdmin admin = null;
73      protected HTablePool htablePool = null;
74      protected final Log LOG = LogFactory.getLog(this.getClass().getName());
75  
76      // nextScannerId and scannerMap are used to manage scanner state
77      protected int nextScannerId = 0;
78      protected HashMap<Integer, ResultScanner> scannerMap = null;
79  
80      //
81      // UTILITY METHODS
82      //
83  
84      /**
85       * Assigns a unique ID to the scanner and adds the mapping to an internal
86       * hash-map.
87       *
88       * @param scanner
89       * @return integer scanner id
90       */
91      protected synchronized int addScanner(ResultScanner scanner) {
92        int id = nextScannerId++;
93        scannerMap.put(id, scanner);
94        return id;
95      }
96  
97      /**
98       * Returns the scanner associated with the specified ID.
99       *
100      * @param id
101      * @return a Scanner, or null if ID was invalid.
102      */
103     protected synchronized ResultScanner getScanner(int id) {
104       return scannerMap.get(id);
105     }
106 
107     /**
108      * Removes the scanner associated with the specified ID from the internal
109      * id->scanner hash-map.
110      *
111      * @param id
112      * @return a Scanner, or null if ID was invalid.
113      */
114     protected synchronized ResultScanner removeScanner(int id) {
115       return scannerMap.remove(id);
116     }
117 
118     //
119     // CTOR METHODS
120     //
121 
122     // TODO(hammer): figure out appropriate setting of maxSize for htablePool
123     /**
124      * Constructs an HBaseImpl object.
125      * @throws IOException 
126      */
127     HBaseImpl() throws IOException {
128       this(HBaseConfiguration.create());
129     }
130 
131     HBaseImpl(final Configuration c) throws IOException {
132       conf = c;
133       admin = new HBaseAdmin(conf);
134       htablePool = new HTablePool(conf, 10);
135       scannerMap = new HashMap<Integer, ResultScanner>();
136     }
137 
138     //
139     // SERVICE METHODS
140     //
141 
142     // TODO(hammer): Investigate use of the Command design pattern
143 
144     //
145     // Cluster metadata
146     //
147 
148     public Utf8 getHBaseVersion() throws AIOError {
149       try {
150 	return new Utf8(admin.getClusterStatus().getHBaseVersion());
151       } catch (IOException e) {
152 	AIOError ioe = new AIOError();
153 	ioe.message = new Utf8(e.getMessage());
154         throw ioe;
155       }
156     }
157 
158     public AClusterStatus getClusterStatus() throws AIOError {
159       try {
160 	return AvroUtil.csToACS(admin.getClusterStatus());
161       } catch (IOException e) {
162 	AIOError ioe = new AIOError();
163 	ioe.message = new Utf8(e.getMessage());
164         throw ioe;
165       }
166     }
167 
168     public GenericArray<ATableDescriptor> listTables() throws AIOError {
169       try {
170         HTableDescriptor[] tables = admin.listTables();
171 	Schema atdSchema = Schema.createArray(ATableDescriptor.SCHEMA$);
172         GenericData.Array<ATableDescriptor> result = null;
173 	result = new GenericData.Array<ATableDescriptor>(tables.length, atdSchema);
174         for (HTableDescriptor table : tables) {
175 	  result.add(AvroUtil.htdToATD(table));
176 	}
177         return result;
178       } catch (IOException e) {
179 	AIOError ioe = new AIOError();
180 	ioe.message = new Utf8(e.getMessage());
181         throw ioe;
182       }
183     }
184 
185     //
186     // Table metadata
187     //
188 
189     // TODO(hammer): Handle the case where the table does not exist explicitly?
190     public ATableDescriptor describeTable(ByteBuffer table) throws AIOError {
191       try {
192 	return AvroUtil.htdToATD(admin.getTableDescriptor(Bytes.toBytes(table)));
193       } catch (IOException e) {
194         AIOError ioe = new AIOError();
195         ioe.message = new Utf8(e.getMessage());
196         throw ioe;
197       }
198     }
199 
200     public boolean isTableEnabled(ByteBuffer table) throws AIOError {
201       try {
202 	return admin.isTableEnabled(Bytes.toBytes(table));
203       } catch (IOException e) {
204 	AIOError ioe = new AIOError();
205 	ioe.message = new Utf8(e.getMessage());
206         throw ioe;
207       }
208     }
209 
210     public boolean tableExists(ByteBuffer table) throws AIOError {
211       try {
212 	return admin.tableExists(Bytes.toBytes(table));
213       } catch (IOException e) {
214 	AIOError ioe = new AIOError();
215 	ioe.message = new Utf8(e.getMessage());
216         throw ioe;
217       }
218     }
219 
220     //
221     // Family metadata
222     //
223 
224     // TODO(hammer): Handle the case where the family does not exist explicitly?
225     public AFamilyDescriptor describeFamily(ByteBuffer table, ByteBuffer family) throws AIOError {
226       try {
227 	HTableDescriptor htd = admin.getTableDescriptor(Bytes.toBytes(table));
228 	return AvroUtil.hcdToAFD(htd.getFamily(Bytes.toBytes(family)));
229       } catch (IOException e) {
230         AIOError ioe = new AIOError();
231         ioe.message = new Utf8(e.getMessage());
232         throw ioe;
233       }
234     }
235 
236     //
237     // Table admin
238     //
239 
240     public Void createTable(ATableDescriptor table) throws AIOError, 
241                                                            AIllegalArgument,
242                                                            ATableExists,
243                                                            AMasterNotRunning {
244       try {
245         admin.createTable(AvroUtil.atdToHTD(table));
246 	return null;
247       } catch (IllegalArgumentException e) {
248 	AIllegalArgument iae = new AIllegalArgument();
249 	iae.message = new Utf8(e.getMessage());
250         throw iae;
251       } catch (TableExistsException e) {
252 	ATableExists tee = new ATableExists();
253 	tee.message = new Utf8(e.getMessage());
254         throw tee;
255       } catch (MasterNotRunningException e) {
256 	AMasterNotRunning mnre = new AMasterNotRunning();
257 	mnre.message = new Utf8(e.getMessage());
258         throw mnre;
259       } catch (IOException e) {
260 	AIOError ioe = new AIOError();
261 	ioe.message = new Utf8(e.getMessage());
262         throw ioe;
263       }
264     }
265 
266     // Note that disable, flush and major compaction of .META. needed in client
267     // TODO(hammer): more selective cache dirtying than flush?
268     public Void deleteTable(ByteBuffer table) throws AIOError {
269       try {
270 	admin.deleteTable(Bytes.toBytes(table));
271 	return null;
272       } catch (IOException e) {
273 	AIOError ioe = new AIOError();
274 	ioe.message = new Utf8(e.getMessage());
275         throw ioe;
276       }
277     }
278 
279     // NB: Asynchronous operation
280     public Void modifyTable(ByteBuffer tableName, ATableDescriptor tableDescriptor) throws AIOError {
281       try {
282 	admin.modifyTable(Bytes.toBytes(tableName),
283                           AvroUtil.atdToHTD(tableDescriptor));
284 	return null;
285       } catch (IOException e) {
286 	AIOError ioe = new AIOError();
287 	ioe.message = new Utf8(e.getMessage());
288         throw ioe;
289       }
290     }
291 
292     public Void enableTable(ByteBuffer table) throws AIOError {
293       try {
294 	admin.enableTable(Bytes.toBytes(table));
295 	return null;
296       } catch (IOException e) {
297 	AIOError ioe = new AIOError();
298 	ioe.message = new Utf8(e.getMessage());
299         throw ioe;
300       }
301     }
302     
303     public Void disableTable(ByteBuffer table) throws AIOError {
304       try {
305 	admin.disableTable(Bytes.toBytes(table));
306 	return null;
307       } catch (IOException e) {
308 	AIOError ioe = new AIOError();
309 	ioe.message = new Utf8(e.getMessage());
310         throw ioe;
311       }
312     }
313     
314     // NB: Asynchronous operation
315     public Void flush(ByteBuffer table) throws AIOError {
316       try {
317 	admin.flush(Bytes.toBytes(table));
318 	return null;
319       } catch (InterruptedException e) {
320 	AIOError ioe = new AIOError();
321 	ioe.message = new Utf8(e.getMessage());
322         throw ioe;
323       } catch (IOException e) {
324 	AIOError ioe = new AIOError();
325 	ioe.message = new Utf8(e.getMessage());
326         throw ioe;
327       }
328     }
329 
330     // NB: Asynchronous operation
331     public Void split(ByteBuffer table) throws AIOError {
332       try {
333 	admin.split(Bytes.toBytes(table));
334 	return null;
335       } catch (InterruptedException e) {
336 	AIOError ioe = new AIOError();
337 	ioe.message = new Utf8(e.getMessage());
338         throw ioe;
339       } catch (IOException e) {
340 	AIOError ioe = new AIOError();
341 	ioe.message = new Utf8(e.getMessage());
342         throw ioe;
343       }
344     }
345 
346     //
347     // Family admin
348     //
349 
350     public Void addFamily(ByteBuffer table, AFamilyDescriptor family) throws AIOError {
351       try {
352 	admin.addColumn(Bytes.toBytes(table), 
353                         AvroUtil.afdToHCD(family));
354 	return null;
355       } catch (IOException e) {
356 	AIOError ioe = new AIOError();
357 	ioe.message = new Utf8(e.getMessage());
358         throw ioe;
359       }
360     }
361 
362     // NB: Asynchronous operation
363     public Void deleteFamily(ByteBuffer table, ByteBuffer family) throws AIOError {
364       try {
365 	admin.deleteColumn(Bytes.toBytes(table), Bytes.toBytes(family));
366 	return null;
367       } catch (IOException e) {
368 	AIOError ioe = new AIOError();
369 	ioe.message = new Utf8(e.getMessage());
370         throw ioe;
371       }
372     }
373 
374     // NB: Asynchronous operation
375     public Void modifyFamily(ByteBuffer table, ByteBuffer familyName, AFamilyDescriptor familyDescriptor) throws AIOError {
376       try {
377 	admin.modifyColumn(Bytes.toBytes(table), AvroUtil.afdToHCD(familyDescriptor));
378 	return null;
379       } catch (IOException e) {
380 	AIOError ioe = new AIOError();
381 	ioe.message = new Utf8(e.getMessage());
382         throw ioe;
383       }
384     }
385 
386     //
387     // Single-row DML
388     //
389 
390     // TODO(hammer): Java with statement for htablepool concision?
391     // TODO(hammer): Can Get have timestamp and timerange simultaneously?
392     // TODO(hammer): Do I need to catch the RuntimeException of getTable?
393     // TODO(hammer): Handle gets with no results
394     // TODO(hammer): Uses exists(Get) to ensure columns exist
395     public AResult get(ByteBuffer table, AGet aget) throws AIOError {
396       HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
397       try {
398         return AvroUtil.resultToAResult(htable.get(AvroUtil.agetToGet(aget)));
399       } catch (IOException e) {
400     	AIOError ioe = new AIOError();
401 	ioe.message = new Utf8(e.getMessage());
402         throw ioe;
403       } finally {
404         htablePool.putTable(htable);
405       }
406     }
407 
408     public boolean exists(ByteBuffer table, AGet aget) throws AIOError {
409       HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
410       try {
411         return htable.exists(AvroUtil.agetToGet(aget));
412       } catch (IOException e) {
413     	AIOError ioe = new AIOError();
414 	ioe.message = new Utf8(e.getMessage());
415         throw ioe;
416       } finally {
417         htablePool.putTable(htable);
418       }
419     }
420 
421     public Void put(ByteBuffer table, APut aput) throws AIOError {
422       HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
423       try {
424 	htable.put(AvroUtil.aputToPut(aput));
425         return null;
426       } catch (IOException e) {
427         AIOError ioe = new AIOError();
428         ioe.message = new Utf8(e.getMessage());
429         throw ioe;
430       } finally {
431         htablePool.putTable(htable);
432       }
433     }
434 
435     public Void delete(ByteBuffer table, ADelete adelete) throws AIOError {
436       HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
437       try {
438         htable.delete(AvroUtil.adeleteToDelete(adelete));
439         return null;
440       } catch (IOException e) {
441     	AIOError ioe = new AIOError();
442 	ioe.message = new Utf8(e.getMessage());
443         throw ioe;
444       } finally {
445         htablePool.putTable(htable);
446       }
447     }
448 
449     public long incrementColumnValue(ByteBuffer table, ByteBuffer row, ByteBuffer family, ByteBuffer qualifier, long amount, boolean writeToWAL) throws AIOError {
450       HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
451       try {
452 	return htable.incrementColumnValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qualifier), amount, writeToWAL);
453       } catch (IOException e) {
454         AIOError ioe = new AIOError();
455         ioe.message = new Utf8(e.getMessage());
456         throw ioe;
457       } finally {
458         htablePool.putTable(htable);
459       }
460     }
461 
462     //
463     // Multi-row DML
464     //
465 
466     public int scannerOpen(ByteBuffer table, AScan ascan) throws AIOError {
467       HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
468       try {
469         Scan scan = AvroUtil.ascanToScan(ascan);
470         return addScanner(htable.getScanner(scan));
471       } catch (IOException e) {
472     	AIOError ioe = new AIOError();
473 	ioe.message = new Utf8(e.getMessage());
474         throw ioe;
475       } finally {
476         htablePool.putTable(htable);
477       }
478     }
479 
480     public Void scannerClose(int scannerId) throws AIOError, AIllegalArgument {
481       try {
482         ResultScanner scanner = getScanner(scannerId);
483         if (scanner == null) {
484       	  AIllegalArgument aie = new AIllegalArgument();
485 	  aie.message = new Utf8("scanner ID is invalid: " + scannerId);
486           throw aie;
487         }
488         scanner.close();
489         removeScanner(scannerId);
490         return null;
491       } catch (IOException e) {
492     	AIOError ioe = new AIOError();
493 	ioe.message = new Utf8(e.getMessage());
494         throw ioe;
495       }
496     }
497 
498     public GenericArray<AResult> scannerGetRows(int scannerId, int numberOfRows) throws AIOError, AIllegalArgument {
499       try {
500         ResultScanner scanner = getScanner(scannerId);
501         if (scanner == null) {
502       	  AIllegalArgument aie = new AIllegalArgument();
503 	  aie.message = new Utf8("scanner ID is invalid: " + scannerId);
504           throw aie;
505         }
506         return AvroUtil.resultsToAResults(scanner.next(numberOfRows));
507       } catch (IOException e) {
508     	AIOError ioe = new AIOError();
509 	ioe.message = new Utf8(e.getMessage());
510         throw ioe;
511       }
512     }
513   }
514 
515   //
516   // MAIN PROGRAM
517   //
518 
519   private static void printUsageAndExit() {
520     printUsageAndExit(null);
521   }
522   
523   private static void printUsageAndExit(final String message) {
524     if (message != null) {
525       System.err.println(message);
526     }
527     System.out.println("Usage: java org.apache.hadoop.hbase.avro.AvroServer " +
528       "--help | [--port=PORT] start");
529     System.out.println("Arguments:");
530     System.out.println(" start Start Avro server");
531     System.out.println(" stop  Stop Avro server");
532     System.out.println("Options:");
533     System.out.println(" port  Port to listen on. Default: 9090");
534     System.out.println(" help  Print this message and exit");
535     System.exit(0);
536   }
537 
538   // TODO(hammer): Figure out a better way to keep the server alive!
539   protected static void doMain(final String[] args) throws Exception {
540     if (args.length < 1) {
541       printUsageAndExit();
542     }
543     int port = 9090;
544     final String portArgKey = "--port=";
545     for (String cmd: args) {
546       if (cmd.startsWith(portArgKey)) {
547         port = Integer.parseInt(cmd.substring(portArgKey.length()));
548         continue;
549       } else if (cmd.equals("--help") || cmd.equals("-h")) {
550         printUsageAndExit();
551       } else if (cmd.equals("start")) {
552         continue;
553       } else if (cmd.equals("stop")) {
554         printUsageAndExit("To shutdown the Avro server run " +
555           "bin/hbase-daemon.sh stop avro or send a kill signal to " +
556           "the Avro server pid");
557       }
558       
559       // Print out usage if we get to here.
560       printUsageAndExit();
561     }
562     Log LOG = LogFactory.getLog("AvroServer");
563     LOG.info("starting HBase Avro server on port " + Integer.toString(port));
564     SpecificResponder r = new SpecificResponder(HBase.class, new HBaseImpl());
565     new HttpServer(r, 9090);
566     Thread.sleep(1000000);
567   }
568 
569   // TODO(hammer): Look at Cassandra's daemonization and integration with JSVC
570   // TODO(hammer): Don't eat it after a single exception
571   // TODO(hammer): Figure out why we do doMain()
572   // TODO(hammer): Figure out if we want String[] or String [] syntax
573   public static void main(String[] args) throws Exception {
574     doMain(args);
575   }
576 }