View Javadoc

1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.rest.client;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.TreeMap;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  
35  import org.apache.hadoop.util.StringUtils;
36  
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.HBaseConfiguration;
39  import org.apache.hadoop.hbase.HConstants;
40  import org.apache.hadoop.hbase.HTableDescriptor;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.client.Delete;
43  import org.apache.hadoop.hbase.client.Get;
44  import org.apache.hadoop.hbase.client.HTableInterface;
45  import org.apache.hadoop.hbase.client.Increment;
46  import org.apache.hadoop.hbase.client.Put;
47  import org.apache.hadoop.hbase.client.Row;
48  import org.apache.hadoop.hbase.client.Result;
49  import org.apache.hadoop.hbase.client.ResultScanner;
50  import org.apache.hadoop.hbase.client.RowLock;
51  import org.apache.hadoop.hbase.client.Scan;
52  import org.apache.hadoop.hbase.io.TimeRange;
53  import org.apache.hadoop.hbase.rest.Constants;
54  import org.apache.hadoop.hbase.rest.model.CellModel;
55  import org.apache.hadoop.hbase.rest.model.CellSetModel;
56  import org.apache.hadoop.hbase.rest.model.RowModel;
57  import org.apache.hadoop.hbase.rest.model.ScannerModel;
58  import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
59  import org.apache.hadoop.hbase.util.Bytes;
60  
61  /**
62   * HTable interface to remote tables accessed via REST gateway
63   */
64  public class RemoteHTable implements HTableInterface {
65  
66    private static final Log LOG = LogFactory.getLog(RemoteHTable.class);
67  
68    final Client client;
69    final Configuration conf;
70    final byte[] name;
71    final String accessToken;
72    final int maxRetries;
73    final long sleepTime;
74  
75    @SuppressWarnings("unchecked")
76    protected String buildRowSpec(final byte[] row, final Map familyMap, 
77        final long startTime, final long endTime, final int maxVersions) {
78      StringBuffer sb = new StringBuffer();
79      sb.append('/');
80      if (accessToken != null) {
81        sb.append(accessToken);
82        sb.append('/');
83      }
84      sb.append(Bytes.toStringBinary(name));
85      sb.append('/');
86      sb.append(Bytes.toStringBinary(row));
87      Set families = familyMap.entrySet();
88      if (families != null) {
89        Iterator i = familyMap.entrySet().iterator();
90        if (i.hasNext()) {
91          sb.append('/');
92        }
93        while (i.hasNext()) {
94          Map.Entry e = (Map.Entry)i.next();
95          Collection quals = (Collection)e.getValue();
96          if (quals != null && !quals.isEmpty()) {
97            Iterator ii = quals.iterator();
98            while (ii.hasNext()) {
99              sb.append(Bytes.toStringBinary((byte[])e.getKey()));
100             sb.append(':');
101             Object o = ii.next();
102             // Puts use byte[] but Deletes use KeyValue
103             if (o instanceof byte[]) {
104               sb.append(Bytes.toStringBinary((byte[])o));
105             } else if (o instanceof KeyValue) {
106               sb.append(Bytes.toStringBinary(((KeyValue)o).getQualifier()));
107             } else {
108               throw new RuntimeException("object type not handled");
109             }
110             if (ii.hasNext()) {
111               sb.append(',');
112             }
113           }
114         } else {
115           sb.append(Bytes.toStringBinary((byte[])e.getKey()));
116           sb.append(':');
117         }
118         if (i.hasNext()) {
119           sb.append(',');
120         }
121       }
122     }
123     if (startTime != 0 && endTime != Long.MAX_VALUE) {
124       sb.append('/');
125       sb.append(startTime);
126       if (startTime != endTime) {
127         sb.append(',');
128         sb.append(endTime);
129       }
130     } else if (endTime != Long.MAX_VALUE) {
131       sb.append('/');
132       sb.append(endTime);
133     }
134     if (maxVersions > 1) {
135       sb.append("?v=");
136       sb.append(maxVersions);
137     }
138     return sb.toString();
139   }
140 
141   protected Result[] buildResultFromModel(final CellSetModel model) {
142     List<Result> results = new ArrayList<Result>();
143     for (RowModel row: model.getRows()) {
144       List<KeyValue> kvs = new ArrayList<KeyValue>();
145       for (CellModel cell: row.getCells()) {
146         byte[][] split = KeyValue.parseColumn(cell.getColumn());
147         byte[] column = split[0];
148         byte[] qualifier = split.length > 1 ? split[1] : null;
149         kvs.add(new KeyValue(row.getKey(), column, qualifier, 
150           cell.getTimestamp(), cell.getValue()));
151       }
152       results.add(new Result(kvs));
153     }
154     return results.toArray(new Result[results.size()]);
155   }
156 
157   protected CellSetModel buildModelFromPut(Put put) {
158     RowModel row = new RowModel(put.getRow());
159     long ts = put.getTimeStamp();
160     for (List<KeyValue> kvs: put.getFamilyMap().values()) {
161       for (KeyValue kv: kvs) {
162         row.addCell(new CellModel(kv.getFamily(), kv.getQualifier(),
163           ts != HConstants.LATEST_TIMESTAMP ? ts : kv.getTimestamp(),
164           kv.getValue()));
165       }
166     }
167     CellSetModel model = new CellSetModel();
168     model.addRow(row);
169     return model;
170   }
171 
172   /**
173    * Constructor
174    * @param client
175    * @param name
176    */
177   public RemoteHTable(Client client, String name) {
178     this(client, HBaseConfiguration.create(), Bytes.toBytes(name), null);
179   }
180 
181   /**
182    * Constructor
183    * @param client
184    * @param name
185    * @param accessToken
186    */
187   public RemoteHTable(Client client, String name, String accessToken) {
188     this(client, HBaseConfiguration.create(), Bytes.toBytes(name), accessToken);
189   }
190 
191   /**
192    * Constructor
193    * @param client
194    * @param conf
195    * @param name
196    * @param accessToken
197    */
198   public RemoteHTable(Client client, Configuration conf, String name,
199       String accessToken) {
200     this(client, conf, Bytes.toBytes(name), accessToken);
201   }
202 
203   /**
204    * Constructor
205    * @param conf
206    */
207   public RemoteHTable(Client client, Configuration conf, byte[] name,
208       String accessToken) {
209     this.client = client;
210     this.conf = conf;
211     this.name = name;
212     this.accessToken = accessToken;
213     this.maxRetries = conf.getInt("hbase.rest.client.max.retries", 10);
214     this.sleepTime = conf.getLong("hbase.rest.client.sleep", 1000);
215   }
216 
217   public byte[] getTableName() {
218     return name.clone();
219   }
220 
221   public Configuration getConfiguration() {
222     return conf;
223   }
224 
225   public HTableDescriptor getTableDescriptor() throws IOException {
226     StringBuilder sb = new StringBuilder();
227     sb.append('/');
228     if (accessToken != null) {
229       sb.append(accessToken);
230       sb.append('/');
231     }
232     sb.append(Bytes.toStringBinary(name));
233     sb.append('/');
234     sb.append("schema");
235     for (int i = 0; i < maxRetries; i++) {
236       Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF);
237       int code = response.getCode();
238       switch (code) {
239       case 200:
240         TableSchemaModel schema = new TableSchemaModel();
241         schema.getObjectFromMessage(response.getBody());
242         return schema.getTableDescriptor();
243       case 509: 
244         try {
245           Thread.sleep(sleepTime);
246         } catch (InterruptedException e) { }
247         break;
248       default:
249         throw new IOException("schema request returned " + code);
250       }
251     }
252     throw new IOException("schema request timed out");
253   }
254 
255   public void close() throws IOException {
256     client.shutdown();
257   }
258 
259   public Result get(Get get) throws IOException {
260     TimeRange range = get.getTimeRange();
261     String spec = buildRowSpec(get.getRow(), get.getFamilyMap(),
262       range.getMin(), range.getMax(), get.getMaxVersions());
263     if (get.getFilter() != null) {
264       LOG.warn("filters not supported on gets");
265     }
266     for (int i = 0; i < maxRetries; i++) {
267       Response response = client.get(spec, Constants.MIMETYPE_PROTOBUF);
268       int code = response.getCode();
269       switch (code) {
270       case 200:
271         CellSetModel model = new CellSetModel();
272         model.getObjectFromMessage(response.getBody());
273         Result[] results = buildResultFromModel(model);
274         if (results.length > 0) {
275           if (results.length > 1) {
276             LOG.warn("too many results for get (" + results.length + ")");
277           }
278           return results[0];
279         }
280         // fall through
281       case 404:
282         return new Result();
283       case 509:
284         try {
285           Thread.sleep(sleepTime);
286         } catch (InterruptedException e) { }
287         break;
288       default:
289         throw new IOException("get request returned " + code);
290       }
291     }
292     throw new IOException("get request timed out");
293   }
294 
295   public boolean exists(Get get) throws IOException {
296     LOG.warn("exists() is really get(), just use get()");
297     Result result = get(get);
298     return (result != null && !(result.isEmpty()));
299   }
300 
301   public void put(Put put) throws IOException {
302     CellSetModel model = buildModelFromPut(put);
303     StringBuilder sb = new StringBuilder();
304     sb.append('/');
305     if (accessToken != null) {
306       sb.append(accessToken);
307       sb.append('/');      
308     }
309     sb.append(Bytes.toStringBinary(name));
310     sb.append('/');
311     sb.append(Bytes.toStringBinary(put.getRow()));
312     for (int i = 0; i < maxRetries; i++) {
313       Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
314         model.createProtobufOutput());
315       int code = response.getCode();
316       switch (code) {
317       case 200:
318         return;
319       case 509:
320         try {
321           Thread.sleep(sleepTime);
322         } catch (InterruptedException e) { }
323         break;
324       default:
325         throw new IOException("put request failed with " + code);
326       }
327     }
328     throw new IOException("put request timed out");
329   }
330 
331   public void put(List<Put> puts) throws IOException {
332     // this is a trick: The gateway accepts multiple rows in a cell set and
333     // ignores the row specification in the URI
334 
335     // separate puts by row
336     TreeMap<byte[],List<KeyValue>> map =
337       new TreeMap<byte[],List<KeyValue>>(Bytes.BYTES_COMPARATOR);
338     for (Put put: puts) {
339       byte[] row = put.getRow();
340       List<KeyValue> kvs = map.get(row);
341       if (kvs == null) {
342         kvs = new ArrayList<KeyValue>();
343         map.put(row, kvs);
344       }
345       for (List<KeyValue> l: put.getFamilyMap().values()) {
346         kvs.addAll(l);
347       }
348     }
349 
350     // build the cell set
351     CellSetModel model = new CellSetModel();
352     for (Map.Entry<byte[], List<KeyValue>> e: map.entrySet()) {
353       RowModel row = new RowModel(e.getKey());
354       for (KeyValue kv: e.getValue()) {
355         row.addCell(new CellModel(kv));
356       }
357       model.addRow(row);
358     }
359 
360     // build path for multiput
361     StringBuilder sb = new StringBuilder();
362     sb.append('/');
363     if (accessToken != null) {
364       sb.append(accessToken);
365       sb.append('/');      
366     }
367     sb.append(Bytes.toStringBinary(name));
368     sb.append("/$multiput"); // can be any nonexistent row
369     for (int i = 0; i < maxRetries; i++) {
370       Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
371         model.createProtobufOutput());
372       int code = response.getCode();
373       switch (code) {
374       case 200:
375         return;
376       case 509:
377         try {
378           Thread.sleep(sleepTime);
379         } catch (InterruptedException e) { }
380         break;
381       default:
382         throw new IOException("multiput request failed with " + code);
383       }
384     }
385     throw new IOException("multiput request timed out");
386   }
387 
388   public void delete(Delete delete) throws IOException {
389     String spec = buildRowSpec(delete.getRow(), delete.getFamilyMap(),
390       delete.getTimeStamp(), delete.getTimeStamp(), 1);
391     for (int i = 0; i < maxRetries; i++) {
392       Response response = client.delete(spec);
393       int code = response.getCode();
394       switch (code) {
395       case 200:
396         return;
397       case 509:
398         try {
399           Thread.sleep(sleepTime);
400         } catch (InterruptedException e) { }
401         break;
402       default:
403         throw new IOException("delete request failed with " + code);
404       }
405     }
406     throw new IOException("delete request timed out");
407   }
408 
409   public void delete(List<Delete> deletes) throws IOException {
410     for (Delete delete: deletes) {
411       delete(delete);
412     }
413   }
414 
415   public void flushCommits() throws IOException {
416     // no-op
417   }
418 
419   class Scanner implements ResultScanner {
420 
421     String uri;
422 
423     public Scanner(Scan scan) throws IOException {
424       ScannerModel model;
425       try {
426         model = ScannerModel.fromScan(scan);
427       } catch (Exception e) {
428         throw new IOException(e);
429       }
430       StringBuffer sb = new StringBuffer();
431       sb.append('/');
432       if (accessToken != null) {
433         sb.append(accessToken);
434         sb.append('/');
435       }
436       sb.append(Bytes.toStringBinary(name));
437       sb.append('/');
438       sb.append("scanner");
439       for (int i = 0; i < maxRetries; i++) {
440         Response response = client.post(sb.toString(),
441           Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
442         int code = response.getCode();
443         switch (code) {
444         case 201:
445           uri = response.getLocation();
446           return;
447         case 509:
448           try {
449             Thread.sleep(sleepTime);
450           } catch (InterruptedException e) { }
451           break;
452         default:
453           throw new IOException("scan request failed with " + code);
454         }
455       }
456       throw new IOException("scan request timed out");
457     }
458 
459     @Override
460     public Result[] next(int nbRows) throws IOException {
461       StringBuilder sb = new StringBuilder(uri);
462       sb.append("?n=");
463       sb.append(nbRows);
464       for (int i = 0; i < maxRetries; i++) {
465         Response response = client.get(sb.toString(),
466           Constants.MIMETYPE_PROTOBUF);
467         int code = response.getCode();
468         switch (code) {
469         case 200:
470           CellSetModel model = new CellSetModel();
471           model.getObjectFromMessage(response.getBody());
472           return buildResultFromModel(model);
473         case 204:
474         case 206:
475           return null;
476         case 509:
477           try {
478             Thread.sleep(sleepTime);
479           } catch (InterruptedException e) { }
480           break;
481         default:
482           throw new IOException("scanner.next request failed with " + code);
483         }
484       }
485       throw new IOException("scanner.next request timed out");
486     }
487 
488     @Override
489     public Result next() throws IOException {
490       Result[] results = next(1);
491       if (results == null || results.length < 1) {
492         return null;
493       }
494       return results[0];
495     }
496     
497     class Iter implements Iterator<Result> {
498 
499       Result cache;
500 
501       public Iter() {
502         try {
503           cache = Scanner.this.next();
504         } catch (IOException e) {
505           LOG.warn(StringUtils.stringifyException(e));
506         }
507       }
508 
509       @Override
510       public boolean hasNext() {
511         return cache != null;
512       }
513 
514       @Override
515       public Result next() {
516         Result result = cache;
517         try {
518           cache = Scanner.this.next();
519         } catch (IOException e) {
520           LOG.warn(StringUtils.stringifyException(e));
521           cache = null;
522         }
523         return result;
524       }
525 
526       @Override
527       public void remove() {
528         throw new RuntimeException("remove() not supported");
529       }
530       
531     }
532 
533     @Override
534     public Iterator<Result> iterator() {
535       return new Iter();
536     }
537 
538     @Override
539     public void close() {
540       try {
541         client.delete(uri);
542       } catch (IOException e) {
543         LOG.warn(StringUtils.stringifyException(e));
544       }
545     }
546 
547   }
548 
549   public ResultScanner getScanner(Scan scan) throws IOException {
550     return new Scanner(scan);
551   }
552 
553   public ResultScanner getScanner(byte[] family) throws IOException {
554     Scan scan = new Scan();
555     scan.addFamily(family);
556     return new Scanner(scan);
557   }
558 
559   public ResultScanner getScanner(byte[] family, byte[] qualifier)
560       throws IOException {
561     Scan scan = new Scan();
562     scan.addColumn(family, qualifier);
563     return new Scanner(scan);
564   }
565 
566   public boolean isAutoFlush() {
567     return true;
568   }
569 
570   public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
571     throw new IOException("getRowOrBefore not supported");
572   }
573 
574   public RowLock lockRow(byte[] row) throws IOException {
575     throw new IOException("lockRow not implemented");
576   }
577 
578   public void unlockRow(RowLock rl) throws IOException {
579     throw new IOException("unlockRow not implemented");
580   }
581 
582   public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
583       byte[] value, Put put) throws IOException {
584     throw new IOException("checkAndPut not supported");
585   }
586 
587   public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
588       byte[] value, Delete delete) throws IOException {
589     throw new IOException("checkAndDelete not supported");
590   }
591 
592   public Result increment(Increment increment) throws IOException {
593     throw new IOException("Increment not supported");
594   }
595 
596   public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
597       long amount) throws IOException {
598     throw new IOException("incrementColumnValue not supported");
599   }
600 
601   public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
602       long amount, boolean writeToWAL) throws IOException {
603     throw new IOException("incrementColumnValue not supported");
604   }
605 
606   @Override
607   public void batch(List<Row> actions, Object[] results) throws IOException {
608     throw new IOException("batch not supported");
609   }
610 
611   @Override
612   public Object[] batch(List<Row> actions) throws IOException {
613     throw new IOException("batch not supported");
614   }
615 
616   @Override
617   public Result[] get(List<Get> gets) throws IOException {
618     throw new IOException("get(List<Get>) not supported");
619   }
620 
621 }