1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.rest;
22
23 import java.io.IOException;
24 import java.util.Iterator;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.hbase.HColumnDescriptor;
29 import org.apache.hadoop.hbase.KeyValue;
30 import org.apache.hadoop.hbase.UnknownScannerException;
31 import org.apache.hadoop.hbase.client.HTableInterface;
32 import org.apache.hadoop.hbase.client.HTablePool;
33 import org.apache.hadoop.hbase.client.Result;
34 import org.apache.hadoop.hbase.client.ResultScanner;
35 import org.apache.hadoop.hbase.client.Scan;
36 import org.apache.hadoop.hbase.filter.Filter;
37 import org.apache.hadoop.hbase.rest.model.ScannerModel;
38 import org.apache.hadoop.util.StringUtils;
39
40 public class ScannerResultGenerator extends ResultGenerator {
41
42 private static final Log LOG =
43 LogFactory.getLog(ScannerResultGenerator.class);
44
45 public static Filter buildFilterFromModel(final ScannerModel model)
46 throws Exception {
47 String filter = model.getFilter();
48 if (filter == null || filter.length() == 0) {
49 return null;
50 }
51 return buildFilter(filter);
52 }
53
54 private String id;
55 private Iterator<KeyValue> rowI;
56 private KeyValue cache;
57 private ResultScanner scanner;
58 private Result cached;
59
60 public ScannerResultGenerator(final String tableName, final RowSpec rowspec,
61 final Filter filter) throws IllegalArgumentException, IOException {
62 HTablePool pool = RESTServlet.getInstance().getTablePool();
63 HTableInterface table = pool.getTable(tableName);
64 try {
65 Scan scan;
66 if (rowspec.hasEndRow()) {
67 scan = new Scan(rowspec.getStartRow(), rowspec.getEndRow());
68 } else {
69 scan = new Scan(rowspec.getStartRow());
70 }
71 if (rowspec.hasColumns()) {
72 byte[][] columns = rowspec.getColumns();
73 for (byte[] column: columns) {
74 byte[][] split = KeyValue.parseColumn(column);
75 if (split.length > 1 && (split[1] != null && split[1].length != 0)) {
76 scan.addColumn(split[0], split[1]);
77 } else {
78 scan.addFamily(split[0]);
79 }
80 }
81 } else {
82 for (HColumnDescriptor family:
83 table.getTableDescriptor().getFamilies()) {
84 scan.addFamily(family.getName());
85 }
86 }
87 scan.setTimeRange(rowspec.getStartTime(), rowspec.getEndTime());
88 scan.setMaxVersions(rowspec.getMaxVersions());
89 if (filter != null) {
90 scan.setFilter(filter);
91 }
92
93 scan.setCacheBlocks(false);
94 scanner = table.getScanner(scan);
95 cached = null;
96 id = Long.toString(System.currentTimeMillis()) +
97 Integer.toHexString(scanner.hashCode());
98 } finally {
99 pool.putTable(table);
100 }
101 }
102
103 public String getID() {
104 return id;
105 }
106
107 public void close() {
108 }
109
110 public boolean hasNext() {
111 if (cache != null) {
112 return true;
113 }
114 if (rowI != null && rowI.hasNext()) {
115 return true;
116 }
117 if (cached != null) {
118 return true;
119 }
120 try {
121 Result result = scanner.next();
122 if (result != null && !result.isEmpty()) {
123 cached = result;
124 }
125 } catch (UnknownScannerException e) {
126 throw new IllegalArgumentException(e);
127 } catch (IOException e) {
128 LOG.error(StringUtils.stringifyException(e));
129 }
130 return cached != null;
131 }
132
133 public KeyValue next() {
134 if (cache != null) {
135 KeyValue kv = cache;
136 cache = null;
137 return kv;
138 }
139 boolean loop;
140 do {
141 loop = false;
142 if (rowI != null) {
143 if (rowI.hasNext()) {
144 return rowI.next();
145 } else {
146 rowI = null;
147 }
148 }
149 if (cached != null) {
150 rowI = cached.list().iterator();
151 loop = true;
152 cached = null;
153 } else {
154 Result result = null;
155 try {
156 result = scanner.next();
157 } catch (UnknownScannerException e) {
158 throw new IllegalArgumentException(e);
159 } catch (IOException e) {
160 LOG.error(StringUtils.stringifyException(e));
161 }
162 if (result != null && !result.isEmpty()) {
163 rowI = result.list().iterator();
164 loop = true;
165 }
166 }
167 } while (loop);
168 return null;
169 }
170
171 public void putBack(KeyValue kv) {
172 this.cache = kv;
173 }
174
175 public void remove() {
176 throw new UnsupportedOperationException("remove not supported");
177 }
178 }