1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.avro;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.util.HashMap;
24
25 import org.apache.avro.Schema;
26 import org.apache.avro.generic.GenericArray;
27 import org.apache.avro.generic.GenericData;
28 import org.apache.avro.ipc.HttpServer;
29 import org.apache.avro.specific.SpecificResponder;
30 import org.apache.avro.util.Utf8;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.hbase.HBaseConfiguration;
35 import org.apache.hadoop.hbase.HTableDescriptor;
36 import org.apache.hadoop.hbase.MasterNotRunningException;
37 import org.apache.hadoop.hbase.TableExistsException;
38 import org.apache.hadoop.hbase.avro.generated.AClusterStatus;
39 import org.apache.hadoop.hbase.avro.generated.ADelete;
40 import org.apache.hadoop.hbase.avro.generated.AFamilyDescriptor;
41 import org.apache.hadoop.hbase.avro.generated.AGet;
42 import org.apache.hadoop.hbase.avro.generated.AIOError;
43 import org.apache.hadoop.hbase.avro.generated.AIllegalArgument;
44 import org.apache.hadoop.hbase.avro.generated.AMasterNotRunning;
45 import org.apache.hadoop.hbase.avro.generated.APut;
46 import org.apache.hadoop.hbase.avro.generated.AResult;
47 import org.apache.hadoop.hbase.avro.generated.AScan;
48 import org.apache.hadoop.hbase.avro.generated.ATableDescriptor;
49 import org.apache.hadoop.hbase.avro.generated.ATableExists;
50 import org.apache.hadoop.hbase.avro.generated.HBase;
51 import org.apache.hadoop.hbase.client.HBaseAdmin;
52 import org.apache.hadoop.hbase.client.HTableInterface;
53 import org.apache.hadoop.hbase.client.HTablePool;
54 import org.apache.hadoop.hbase.client.ResultScanner;
55 import org.apache.hadoop.hbase.client.Scan;
56 import org.apache.hadoop.hbase.util.Bytes;
57
58
59
60
61 public class AvroServer {
62
63
64
65
66
67 public static class HBaseImpl implements HBase {
68
69
70
71 protected Configuration conf = null;
72 protected HBaseAdmin admin = null;
73 protected HTablePool htablePool = null;
74 protected final Log LOG = LogFactory.getLog(this.getClass().getName());
75
76
77 protected int nextScannerId = 0;
78 protected HashMap<Integer, ResultScanner> scannerMap = null;
79
80
81
82
83
84
85
86
87
88
89
90
91 protected synchronized int addScanner(ResultScanner scanner) {
92 int id = nextScannerId++;
93 scannerMap.put(id, scanner);
94 return id;
95 }
96
97
98
99
100
101
102
103 protected synchronized ResultScanner getScanner(int id) {
104 return scannerMap.get(id);
105 }
106
107
108
109
110
111
112
113
114 protected synchronized ResultScanner removeScanner(int id) {
115 return scannerMap.remove(id);
116 }
117
118
119
120
121
122
123
124
125
126
127 HBaseImpl() throws IOException {
128 this(HBaseConfiguration.create());
129 }
130
131 HBaseImpl(final Configuration c) throws IOException {
132 conf = c;
133 admin = new HBaseAdmin(conf);
134 htablePool = new HTablePool(conf, 10);
135 scannerMap = new HashMap<Integer, ResultScanner>();
136 }
137
138
139
140
141
142
143
144
145
146
147
148 public Utf8 getHBaseVersion() throws AIOError {
149 try {
150 return new Utf8(admin.getClusterStatus().getHBaseVersion());
151 } catch (IOException e) {
152 AIOError ioe = new AIOError();
153 ioe.message = new Utf8(e.getMessage());
154 throw ioe;
155 }
156 }
157
158 public AClusterStatus getClusterStatus() throws AIOError {
159 try {
160 return AvroUtil.csToACS(admin.getClusterStatus());
161 } catch (IOException e) {
162 AIOError ioe = new AIOError();
163 ioe.message = new Utf8(e.getMessage());
164 throw ioe;
165 }
166 }
167
168 public GenericArray<ATableDescriptor> listTables() throws AIOError {
169 try {
170 HTableDescriptor[] tables = admin.listTables();
171 Schema atdSchema = Schema.createArray(ATableDescriptor.SCHEMA$);
172 GenericData.Array<ATableDescriptor> result = null;
173 result = new GenericData.Array<ATableDescriptor>(tables.length, atdSchema);
174 for (HTableDescriptor table : tables) {
175 result.add(AvroUtil.htdToATD(table));
176 }
177 return result;
178 } catch (IOException e) {
179 AIOError ioe = new AIOError();
180 ioe.message = new Utf8(e.getMessage());
181 throw ioe;
182 }
183 }
184
185
186
187
188
189
190 public ATableDescriptor describeTable(ByteBuffer table) throws AIOError {
191 try {
192 return AvroUtil.htdToATD(admin.getTableDescriptor(Bytes.toBytes(table)));
193 } catch (IOException e) {
194 AIOError ioe = new AIOError();
195 ioe.message = new Utf8(e.getMessage());
196 throw ioe;
197 }
198 }
199
200 public boolean isTableEnabled(ByteBuffer table) throws AIOError {
201 try {
202 return admin.isTableEnabled(Bytes.toBytes(table));
203 } catch (IOException e) {
204 AIOError ioe = new AIOError();
205 ioe.message = new Utf8(e.getMessage());
206 throw ioe;
207 }
208 }
209
210 public boolean tableExists(ByteBuffer table) throws AIOError {
211 try {
212 return admin.tableExists(Bytes.toBytes(table));
213 } catch (IOException e) {
214 AIOError ioe = new AIOError();
215 ioe.message = new Utf8(e.getMessage());
216 throw ioe;
217 }
218 }
219
220
221
222
223
224
225 public AFamilyDescriptor describeFamily(ByteBuffer table, ByteBuffer family) throws AIOError {
226 try {
227 HTableDescriptor htd = admin.getTableDescriptor(Bytes.toBytes(table));
228 return AvroUtil.hcdToAFD(htd.getFamily(Bytes.toBytes(family)));
229 } catch (IOException e) {
230 AIOError ioe = new AIOError();
231 ioe.message = new Utf8(e.getMessage());
232 throw ioe;
233 }
234 }
235
236
237
238
239
240 public Void createTable(ATableDescriptor table) throws AIOError,
241 AIllegalArgument,
242 ATableExists,
243 AMasterNotRunning {
244 try {
245 admin.createTable(AvroUtil.atdToHTD(table));
246 return null;
247 } catch (IllegalArgumentException e) {
248 AIllegalArgument iae = new AIllegalArgument();
249 iae.message = new Utf8(e.getMessage());
250 throw iae;
251 } catch (TableExistsException e) {
252 ATableExists tee = new ATableExists();
253 tee.message = new Utf8(e.getMessage());
254 throw tee;
255 } catch (MasterNotRunningException e) {
256 AMasterNotRunning mnre = new AMasterNotRunning();
257 mnre.message = new Utf8(e.getMessage());
258 throw mnre;
259 } catch (IOException e) {
260 AIOError ioe = new AIOError();
261 ioe.message = new Utf8(e.getMessage());
262 throw ioe;
263 }
264 }
265
266
267
268 public Void deleteTable(ByteBuffer table) throws AIOError {
269 try {
270 admin.deleteTable(Bytes.toBytes(table));
271 return null;
272 } catch (IOException e) {
273 AIOError ioe = new AIOError();
274 ioe.message = new Utf8(e.getMessage());
275 throw ioe;
276 }
277 }
278
279
280 public Void modifyTable(ByteBuffer tableName, ATableDescriptor tableDescriptor) throws AIOError {
281 try {
282 admin.modifyTable(Bytes.toBytes(tableName),
283 AvroUtil.atdToHTD(tableDescriptor));
284 return null;
285 } catch (IOException e) {
286 AIOError ioe = new AIOError();
287 ioe.message = new Utf8(e.getMessage());
288 throw ioe;
289 }
290 }
291
292 public Void enableTable(ByteBuffer table) throws AIOError {
293 try {
294 admin.enableTable(Bytes.toBytes(table));
295 return null;
296 } catch (IOException e) {
297 AIOError ioe = new AIOError();
298 ioe.message = new Utf8(e.getMessage());
299 throw ioe;
300 }
301 }
302
303 public Void disableTable(ByteBuffer table) throws AIOError {
304 try {
305 admin.disableTable(Bytes.toBytes(table));
306 return null;
307 } catch (IOException e) {
308 AIOError ioe = new AIOError();
309 ioe.message = new Utf8(e.getMessage());
310 throw ioe;
311 }
312 }
313
314
315 public Void flush(ByteBuffer table) throws AIOError {
316 try {
317 admin.flush(Bytes.toBytes(table));
318 return null;
319 } catch (InterruptedException e) {
320 AIOError ioe = new AIOError();
321 ioe.message = new Utf8(e.getMessage());
322 throw ioe;
323 } catch (IOException e) {
324 AIOError ioe = new AIOError();
325 ioe.message = new Utf8(e.getMessage());
326 throw ioe;
327 }
328 }
329
330
331 public Void split(ByteBuffer table) throws AIOError {
332 try {
333 admin.split(Bytes.toBytes(table));
334 return null;
335 } catch (InterruptedException e) {
336 AIOError ioe = new AIOError();
337 ioe.message = new Utf8(e.getMessage());
338 throw ioe;
339 } catch (IOException e) {
340 AIOError ioe = new AIOError();
341 ioe.message = new Utf8(e.getMessage());
342 throw ioe;
343 }
344 }
345
346
347
348
349
350 public Void addFamily(ByteBuffer table, AFamilyDescriptor family) throws AIOError {
351 try {
352 admin.addColumn(Bytes.toBytes(table),
353 AvroUtil.afdToHCD(family));
354 return null;
355 } catch (IOException e) {
356 AIOError ioe = new AIOError();
357 ioe.message = new Utf8(e.getMessage());
358 throw ioe;
359 }
360 }
361
362
363 public Void deleteFamily(ByteBuffer table, ByteBuffer family) throws AIOError {
364 try {
365 admin.deleteColumn(Bytes.toBytes(table), Bytes.toBytes(family));
366 return null;
367 } catch (IOException e) {
368 AIOError ioe = new AIOError();
369 ioe.message = new Utf8(e.getMessage());
370 throw ioe;
371 }
372 }
373
374
375 public Void modifyFamily(ByteBuffer table, ByteBuffer familyName, AFamilyDescriptor familyDescriptor) throws AIOError {
376 try {
377 admin.modifyColumn(Bytes.toBytes(table), AvroUtil.afdToHCD(familyDescriptor));
378 return null;
379 } catch (IOException e) {
380 AIOError ioe = new AIOError();
381 ioe.message = new Utf8(e.getMessage());
382 throw ioe;
383 }
384 }
385
386
387
388
389
390
391
392
393
394
395 public AResult get(ByteBuffer table, AGet aget) throws AIOError {
396 HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
397 try {
398 return AvroUtil.resultToAResult(htable.get(AvroUtil.agetToGet(aget)));
399 } catch (IOException e) {
400 AIOError ioe = new AIOError();
401 ioe.message = new Utf8(e.getMessage());
402 throw ioe;
403 } finally {
404 htablePool.putTable(htable);
405 }
406 }
407
408 public boolean exists(ByteBuffer table, AGet aget) throws AIOError {
409 HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
410 try {
411 return htable.exists(AvroUtil.agetToGet(aget));
412 } catch (IOException e) {
413 AIOError ioe = new AIOError();
414 ioe.message = new Utf8(e.getMessage());
415 throw ioe;
416 } finally {
417 htablePool.putTable(htable);
418 }
419 }
420
421 public Void put(ByteBuffer table, APut aput) throws AIOError {
422 HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
423 try {
424 htable.put(AvroUtil.aputToPut(aput));
425 return null;
426 } catch (IOException e) {
427 AIOError ioe = new AIOError();
428 ioe.message = new Utf8(e.getMessage());
429 throw ioe;
430 } finally {
431 htablePool.putTable(htable);
432 }
433 }
434
435 public Void delete(ByteBuffer table, ADelete adelete) throws AIOError {
436 HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
437 try {
438 htable.delete(AvroUtil.adeleteToDelete(adelete));
439 return null;
440 } catch (IOException e) {
441 AIOError ioe = new AIOError();
442 ioe.message = new Utf8(e.getMessage());
443 throw ioe;
444 } finally {
445 htablePool.putTable(htable);
446 }
447 }
448
449 public long incrementColumnValue(ByteBuffer table, ByteBuffer row, ByteBuffer family, ByteBuffer qualifier, long amount, boolean writeToWAL) throws AIOError {
450 HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
451 try {
452 return htable.incrementColumnValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(qualifier), amount, writeToWAL);
453 } catch (IOException e) {
454 AIOError ioe = new AIOError();
455 ioe.message = new Utf8(e.getMessage());
456 throw ioe;
457 } finally {
458 htablePool.putTable(htable);
459 }
460 }
461
462
463
464
465
466 public int scannerOpen(ByteBuffer table, AScan ascan) throws AIOError {
467 HTableInterface htable = htablePool.getTable(Bytes.toBytes(table));
468 try {
469 Scan scan = AvroUtil.ascanToScan(ascan);
470 return addScanner(htable.getScanner(scan));
471 } catch (IOException e) {
472 AIOError ioe = new AIOError();
473 ioe.message = new Utf8(e.getMessage());
474 throw ioe;
475 } finally {
476 htablePool.putTable(htable);
477 }
478 }
479
480 public Void scannerClose(int scannerId) throws AIOError, AIllegalArgument {
481 try {
482 ResultScanner scanner = getScanner(scannerId);
483 if (scanner == null) {
484 AIllegalArgument aie = new AIllegalArgument();
485 aie.message = new Utf8("scanner ID is invalid: " + scannerId);
486 throw aie;
487 }
488 scanner.close();
489 removeScanner(scannerId);
490 return null;
491 } catch (IOException e) {
492 AIOError ioe = new AIOError();
493 ioe.message = new Utf8(e.getMessage());
494 throw ioe;
495 }
496 }
497
498 public GenericArray<AResult> scannerGetRows(int scannerId, int numberOfRows) throws AIOError, AIllegalArgument {
499 try {
500 ResultScanner scanner = getScanner(scannerId);
501 if (scanner == null) {
502 AIllegalArgument aie = new AIllegalArgument();
503 aie.message = new Utf8("scanner ID is invalid: " + scannerId);
504 throw aie;
505 }
506 return AvroUtil.resultsToAResults(scanner.next(numberOfRows));
507 } catch (IOException e) {
508 AIOError ioe = new AIOError();
509 ioe.message = new Utf8(e.getMessage());
510 throw ioe;
511 }
512 }
513 }
514
515
516
517
518
519 private static void printUsageAndExit() {
520 printUsageAndExit(null);
521 }
522
523 private static void printUsageAndExit(final String message) {
524 if (message != null) {
525 System.err.println(message);
526 }
527 System.out.println("Usage: java org.apache.hadoop.hbase.avro.AvroServer " +
528 "--help | [--port=PORT] start");
529 System.out.println("Arguments:");
530 System.out.println(" start Start Avro server");
531 System.out.println(" stop Stop Avro server");
532 System.out.println("Options:");
533 System.out.println(" port Port to listen on. Default: 9090");
534 System.out.println(" help Print this message and exit");
535 System.exit(0);
536 }
537
538
539 protected static void doMain(final String[] args) throws Exception {
540 if (args.length < 1) {
541 printUsageAndExit();
542 }
543 int port = 9090;
544 final String portArgKey = "--port=";
545 for (String cmd: args) {
546 if (cmd.startsWith(portArgKey)) {
547 port = Integer.parseInt(cmd.substring(portArgKey.length()));
548 continue;
549 } else if (cmd.equals("--help") || cmd.equals("-h")) {
550 printUsageAndExit();
551 } else if (cmd.equals("start")) {
552 continue;
553 } else if (cmd.equals("stop")) {
554 printUsageAndExit("To shutdown the Avro server run " +
555 "bin/hbase-daemon.sh stop avro or send a kill signal to " +
556 "the Avro server pid");
557 }
558
559
560 printUsageAndExit();
561 }
562 Log LOG = LogFactory.getLog("AvroServer");
563 LOG.info("starting HBase Avro server on port " + Integer.toString(port));
564 SpecificResponder r = new SpecificResponder(HBase.class, new HBaseImpl());
565 new HttpServer(r, 9090);
566 Thread.sleep(1000000);
567 }
568
569
570
571
572
573 public static void main(String[] args) throws Exception {
574 doMain(args);
575 }
576 }