1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.rest.client;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.Collection;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30 import java.util.TreeMap;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34
35 import org.apache.hadoop.util.StringUtils;
36
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.hbase.HBaseConfiguration;
39 import org.apache.hadoop.hbase.HConstants;
40 import org.apache.hadoop.hbase.HTableDescriptor;
41 import org.apache.hadoop.hbase.KeyValue;
42 import org.apache.hadoop.hbase.client.Delete;
43 import org.apache.hadoop.hbase.client.Get;
44 import org.apache.hadoop.hbase.client.HTableInterface;
45 import org.apache.hadoop.hbase.client.Increment;
46 import org.apache.hadoop.hbase.client.Put;
47 import org.apache.hadoop.hbase.client.Row;
48 import org.apache.hadoop.hbase.client.Result;
49 import org.apache.hadoop.hbase.client.ResultScanner;
50 import org.apache.hadoop.hbase.client.RowLock;
51 import org.apache.hadoop.hbase.client.Scan;
52 import org.apache.hadoop.hbase.io.TimeRange;
53 import org.apache.hadoop.hbase.rest.Constants;
54 import org.apache.hadoop.hbase.rest.model.CellModel;
55 import org.apache.hadoop.hbase.rest.model.CellSetModel;
56 import org.apache.hadoop.hbase.rest.model.RowModel;
57 import org.apache.hadoop.hbase.rest.model.ScannerModel;
58 import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
59 import org.apache.hadoop.hbase.util.Bytes;
60
61
62
63
64 public class RemoteHTable implements HTableInterface {
65
66 private static final Log LOG = LogFactory.getLog(RemoteHTable.class);
67
68 final Client client;
69 final Configuration conf;
70 final byte[] name;
71 final String accessToken;
72 final int maxRetries;
73 final long sleepTime;
74
75 @SuppressWarnings("unchecked")
76 protected String buildRowSpec(final byte[] row, final Map familyMap,
77 final long startTime, final long endTime, final int maxVersions) {
78 StringBuffer sb = new StringBuffer();
79 sb.append('/');
80 if (accessToken != null) {
81 sb.append(accessToken);
82 sb.append('/');
83 }
84 sb.append(Bytes.toStringBinary(name));
85 sb.append('/');
86 sb.append(Bytes.toStringBinary(row));
87 Set families = familyMap.entrySet();
88 if (families != null) {
89 Iterator i = familyMap.entrySet().iterator();
90 if (i.hasNext()) {
91 sb.append('/');
92 }
93 while (i.hasNext()) {
94 Map.Entry e = (Map.Entry)i.next();
95 Collection quals = (Collection)e.getValue();
96 if (quals != null && !quals.isEmpty()) {
97 Iterator ii = quals.iterator();
98 while (ii.hasNext()) {
99 sb.append(Bytes.toStringBinary((byte[])e.getKey()));
100 sb.append(':');
101 Object o = ii.next();
102
103 if (o instanceof byte[]) {
104 sb.append(Bytes.toStringBinary((byte[])o));
105 } else if (o instanceof KeyValue) {
106 sb.append(Bytes.toStringBinary(((KeyValue)o).getQualifier()));
107 } else {
108 throw new RuntimeException("object type not handled");
109 }
110 if (ii.hasNext()) {
111 sb.append(',');
112 }
113 }
114 } else {
115 sb.append(Bytes.toStringBinary((byte[])e.getKey()));
116 sb.append(':');
117 }
118 if (i.hasNext()) {
119 sb.append(',');
120 }
121 }
122 }
123 if (startTime != 0 && endTime != Long.MAX_VALUE) {
124 sb.append('/');
125 sb.append(startTime);
126 if (startTime != endTime) {
127 sb.append(',');
128 sb.append(endTime);
129 }
130 } else if (endTime != Long.MAX_VALUE) {
131 sb.append('/');
132 sb.append(endTime);
133 }
134 if (maxVersions > 1) {
135 sb.append("?v=");
136 sb.append(maxVersions);
137 }
138 return sb.toString();
139 }
140
141 protected Result[] buildResultFromModel(final CellSetModel model) {
142 List<Result> results = new ArrayList<Result>();
143 for (RowModel row: model.getRows()) {
144 List<KeyValue> kvs = new ArrayList<KeyValue>();
145 for (CellModel cell: row.getCells()) {
146 byte[][] split = KeyValue.parseColumn(cell.getColumn());
147 byte[] column = split[0];
148 byte[] qualifier = split.length > 1 ? split[1] : null;
149 kvs.add(new KeyValue(row.getKey(), column, qualifier,
150 cell.getTimestamp(), cell.getValue()));
151 }
152 results.add(new Result(kvs));
153 }
154 return results.toArray(new Result[results.size()]);
155 }
156
157 protected CellSetModel buildModelFromPut(Put put) {
158 RowModel row = new RowModel(put.getRow());
159 long ts = put.getTimeStamp();
160 for (List<KeyValue> kvs: put.getFamilyMap().values()) {
161 for (KeyValue kv: kvs) {
162 row.addCell(new CellModel(kv.getFamily(), kv.getQualifier(),
163 ts != HConstants.LATEST_TIMESTAMP ? ts : kv.getTimestamp(),
164 kv.getValue()));
165 }
166 }
167 CellSetModel model = new CellSetModel();
168 model.addRow(row);
169 return model;
170 }
171
172
173
174
175
176
177 public RemoteHTable(Client client, String name) {
178 this(client, HBaseConfiguration.create(), Bytes.toBytes(name), null);
179 }
180
181
182
183
184
185
186
187 public RemoteHTable(Client client, String name, String accessToken) {
188 this(client, HBaseConfiguration.create(), Bytes.toBytes(name), accessToken);
189 }
190
191
192
193
194
195
196
197
198 public RemoteHTable(Client client, Configuration conf, String name,
199 String accessToken) {
200 this(client, conf, Bytes.toBytes(name), accessToken);
201 }
202
203
204
205
206
207 public RemoteHTable(Client client, Configuration conf, byte[] name,
208 String accessToken) {
209 this.client = client;
210 this.conf = conf;
211 this.name = name;
212 this.accessToken = accessToken;
213 this.maxRetries = conf.getInt("hbase.rest.client.max.retries", 10);
214 this.sleepTime = conf.getLong("hbase.rest.client.sleep", 1000);
215 }
216
217 public byte[] getTableName() {
218 return name.clone();
219 }
220
221 public Configuration getConfiguration() {
222 return conf;
223 }
224
225 public HTableDescriptor getTableDescriptor() throws IOException {
226 StringBuilder sb = new StringBuilder();
227 sb.append('/');
228 if (accessToken != null) {
229 sb.append(accessToken);
230 sb.append('/');
231 }
232 sb.append(Bytes.toStringBinary(name));
233 sb.append('/');
234 sb.append("schema");
235 for (int i = 0; i < maxRetries; i++) {
236 Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF);
237 int code = response.getCode();
238 switch (code) {
239 case 200:
240 TableSchemaModel schema = new TableSchemaModel();
241 schema.getObjectFromMessage(response.getBody());
242 return schema.getTableDescriptor();
243 case 509:
244 try {
245 Thread.sleep(sleepTime);
246 } catch (InterruptedException e) { }
247 break;
248 default:
249 throw new IOException("schema request returned " + code);
250 }
251 }
252 throw new IOException("schema request timed out");
253 }
254
255 public void close() throws IOException {
256 client.shutdown();
257 }
258
259 public Result get(Get get) throws IOException {
260 TimeRange range = get.getTimeRange();
261 String spec = buildRowSpec(get.getRow(), get.getFamilyMap(),
262 range.getMin(), range.getMax(), get.getMaxVersions());
263 if (get.getFilter() != null) {
264 LOG.warn("filters not supported on gets");
265 }
266 for (int i = 0; i < maxRetries; i++) {
267 Response response = client.get(spec, Constants.MIMETYPE_PROTOBUF);
268 int code = response.getCode();
269 switch (code) {
270 case 200:
271 CellSetModel model = new CellSetModel();
272 model.getObjectFromMessage(response.getBody());
273 Result[] results = buildResultFromModel(model);
274 if (results.length > 0) {
275 if (results.length > 1) {
276 LOG.warn("too many results for get (" + results.length + ")");
277 }
278 return results[0];
279 }
280
281 case 404:
282 return new Result();
283 case 509:
284 try {
285 Thread.sleep(sleepTime);
286 } catch (InterruptedException e) { }
287 break;
288 default:
289 throw new IOException("get request returned " + code);
290 }
291 }
292 throw new IOException("get request timed out");
293 }
294
295 public boolean exists(Get get) throws IOException {
296 LOG.warn("exists() is really get(), just use get()");
297 Result result = get(get);
298 return (result != null && !(result.isEmpty()));
299 }
300
301 public void put(Put put) throws IOException {
302 CellSetModel model = buildModelFromPut(put);
303 StringBuilder sb = new StringBuilder();
304 sb.append('/');
305 if (accessToken != null) {
306 sb.append(accessToken);
307 sb.append('/');
308 }
309 sb.append(Bytes.toStringBinary(name));
310 sb.append('/');
311 sb.append(Bytes.toStringBinary(put.getRow()));
312 for (int i = 0; i < maxRetries; i++) {
313 Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
314 model.createProtobufOutput());
315 int code = response.getCode();
316 switch (code) {
317 case 200:
318 return;
319 case 509:
320 try {
321 Thread.sleep(sleepTime);
322 } catch (InterruptedException e) { }
323 break;
324 default:
325 throw new IOException("put request failed with " + code);
326 }
327 }
328 throw new IOException("put request timed out");
329 }
330
331 public void put(List<Put> puts) throws IOException {
332
333
334
335
336 TreeMap<byte[],List<KeyValue>> map =
337 new TreeMap<byte[],List<KeyValue>>(Bytes.BYTES_COMPARATOR);
338 for (Put put: puts) {
339 byte[] row = put.getRow();
340 List<KeyValue> kvs = map.get(row);
341 if (kvs == null) {
342 kvs = new ArrayList<KeyValue>();
343 map.put(row, kvs);
344 }
345 for (List<KeyValue> l: put.getFamilyMap().values()) {
346 kvs.addAll(l);
347 }
348 }
349
350
351 CellSetModel model = new CellSetModel();
352 for (Map.Entry<byte[], List<KeyValue>> e: map.entrySet()) {
353 RowModel row = new RowModel(e.getKey());
354 for (KeyValue kv: e.getValue()) {
355 row.addCell(new CellModel(kv));
356 }
357 model.addRow(row);
358 }
359
360
361 StringBuilder sb = new StringBuilder();
362 sb.append('/');
363 if (accessToken != null) {
364 sb.append(accessToken);
365 sb.append('/');
366 }
367 sb.append(Bytes.toStringBinary(name));
368 sb.append("/$multiput");
369 for (int i = 0; i < maxRetries; i++) {
370 Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
371 model.createProtobufOutput());
372 int code = response.getCode();
373 switch (code) {
374 case 200:
375 return;
376 case 509:
377 try {
378 Thread.sleep(sleepTime);
379 } catch (InterruptedException e) { }
380 break;
381 default:
382 throw new IOException("multiput request failed with " + code);
383 }
384 }
385 throw new IOException("multiput request timed out");
386 }
387
388 public void delete(Delete delete) throws IOException {
389 String spec = buildRowSpec(delete.getRow(), delete.getFamilyMap(),
390 delete.getTimeStamp(), delete.getTimeStamp(), 1);
391 for (int i = 0; i < maxRetries; i++) {
392 Response response = client.delete(spec);
393 int code = response.getCode();
394 switch (code) {
395 case 200:
396 return;
397 case 509:
398 try {
399 Thread.sleep(sleepTime);
400 } catch (InterruptedException e) { }
401 break;
402 default:
403 throw new IOException("delete request failed with " + code);
404 }
405 }
406 throw new IOException("delete request timed out");
407 }
408
409 public void delete(List<Delete> deletes) throws IOException {
410 for (Delete delete: deletes) {
411 delete(delete);
412 }
413 }
414
415 public void flushCommits() throws IOException {
416
417 }
418
419 class Scanner implements ResultScanner {
420
421 String uri;
422
423 public Scanner(Scan scan) throws IOException {
424 ScannerModel model;
425 try {
426 model = ScannerModel.fromScan(scan);
427 } catch (Exception e) {
428 throw new IOException(e);
429 }
430 StringBuffer sb = new StringBuffer();
431 sb.append('/');
432 if (accessToken != null) {
433 sb.append(accessToken);
434 sb.append('/');
435 }
436 sb.append(Bytes.toStringBinary(name));
437 sb.append('/');
438 sb.append("scanner");
439 for (int i = 0; i < maxRetries; i++) {
440 Response response = client.post(sb.toString(),
441 Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
442 int code = response.getCode();
443 switch (code) {
444 case 201:
445 uri = response.getLocation();
446 return;
447 case 509:
448 try {
449 Thread.sleep(sleepTime);
450 } catch (InterruptedException e) { }
451 break;
452 default:
453 throw new IOException("scan request failed with " + code);
454 }
455 }
456 throw new IOException("scan request timed out");
457 }
458
459 @Override
460 public Result[] next(int nbRows) throws IOException {
461 StringBuilder sb = new StringBuilder(uri);
462 sb.append("?n=");
463 sb.append(nbRows);
464 for (int i = 0; i < maxRetries; i++) {
465 Response response = client.get(sb.toString(),
466 Constants.MIMETYPE_PROTOBUF);
467 int code = response.getCode();
468 switch (code) {
469 case 200:
470 CellSetModel model = new CellSetModel();
471 model.getObjectFromMessage(response.getBody());
472 return buildResultFromModel(model);
473 case 204:
474 case 206:
475 return null;
476 case 509:
477 try {
478 Thread.sleep(sleepTime);
479 } catch (InterruptedException e) { }
480 break;
481 default:
482 throw new IOException("scanner.next request failed with " + code);
483 }
484 }
485 throw new IOException("scanner.next request timed out");
486 }
487
488 @Override
489 public Result next() throws IOException {
490 Result[] results = next(1);
491 if (results == null || results.length < 1) {
492 return null;
493 }
494 return results[0];
495 }
496
497 class Iter implements Iterator<Result> {
498
499 Result cache;
500
501 public Iter() {
502 try {
503 cache = Scanner.this.next();
504 } catch (IOException e) {
505 LOG.warn(StringUtils.stringifyException(e));
506 }
507 }
508
509 @Override
510 public boolean hasNext() {
511 return cache != null;
512 }
513
514 @Override
515 public Result next() {
516 Result result = cache;
517 try {
518 cache = Scanner.this.next();
519 } catch (IOException e) {
520 LOG.warn(StringUtils.stringifyException(e));
521 cache = null;
522 }
523 return result;
524 }
525
526 @Override
527 public void remove() {
528 throw new RuntimeException("remove() not supported");
529 }
530
531 }
532
533 @Override
534 public Iterator<Result> iterator() {
535 return new Iter();
536 }
537
538 @Override
539 public void close() {
540 try {
541 client.delete(uri);
542 } catch (IOException e) {
543 LOG.warn(StringUtils.stringifyException(e));
544 }
545 }
546
547 }
548
549 public ResultScanner getScanner(Scan scan) throws IOException {
550 return new Scanner(scan);
551 }
552
553 public ResultScanner getScanner(byte[] family) throws IOException {
554 Scan scan = new Scan();
555 scan.addFamily(family);
556 return new Scanner(scan);
557 }
558
559 public ResultScanner getScanner(byte[] family, byte[] qualifier)
560 throws IOException {
561 Scan scan = new Scan();
562 scan.addColumn(family, qualifier);
563 return new Scanner(scan);
564 }
565
566 public boolean isAutoFlush() {
567 return true;
568 }
569
570 public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
571 throw new IOException("getRowOrBefore not supported");
572 }
573
574 public RowLock lockRow(byte[] row) throws IOException {
575 throw new IOException("lockRow not implemented");
576 }
577
578 public void unlockRow(RowLock rl) throws IOException {
579 throw new IOException("unlockRow not implemented");
580 }
581
582 public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
583 byte[] value, Put put) throws IOException {
584 throw new IOException("checkAndPut not supported");
585 }
586
587 public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
588 byte[] value, Delete delete) throws IOException {
589 throw new IOException("checkAndDelete not supported");
590 }
591
592 public Result increment(Increment increment) throws IOException {
593 throw new IOException("Increment not supported");
594 }
595
596 public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
597 long amount) throws IOException {
598 throw new IOException("incrementColumnValue not supported");
599 }
600
601 public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
602 long amount, boolean writeToWAL) throws IOException {
603 throw new IOException("incrementColumnValue not supported");
604 }
605
606 @Override
607 public void batch(List<Row> actions, Object[] results) throws IOException {
608 throw new IOException("batch not supported");
609 }
610
611 @Override
612 public Object[] batch(List<Row> actions) throws IOException {
613 throw new IOException("batch not supported");
614 }
615
616 @Override
617 public Result[] get(List<Get> gets) throws IOException {
618 throw new IOException("get(List<Get>) not supported");
619 }
620
621 }