1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import static org.junit.Assert.*;
23
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.HBaseTestingUtility;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.filter.Filter;
34 import org.apache.hadoop.hbase.filter.TimestampsFilter;
35 import org.apache.hadoop.hbase.util.Bytes;
36 import org.junit.After;
37 import org.junit.AfterClass;
38 import org.junit.Before;
39 import org.junit.BeforeClass;
40 import org.junit.Test;
41
42
43
44
45
46
47 public class TestTimestampsFilter {
48 final Log LOG = LogFactory.getLog(getClass());
49 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
50
51
52
53
54 @BeforeClass
55 public static void setUpBeforeClass() throws Exception {
56 TEST_UTIL.startMiniCluster(3);
57 }
58
59
60
61
62 @AfterClass
63 public static void tearDownAfterClass() throws Exception {
64 TEST_UTIL.shutdownMiniCluster();
65 }
66
67
68
69
70 @Before
71 public void setUp() throws Exception {
72
73 }
74
75
76
77
78 @After
79 public void tearDown() throws Exception {
80
81 }
82
83
84
85
86
87
88
89
90
91 @Test
92 public void testTimestampsFilter() throws Exception {
93 byte [] TABLE = Bytes.toBytes("testTimestampsFilter");
94 byte [] FAMILY = Bytes.toBytes("event_log");
95 byte [][] FAMILIES = new byte[][] { FAMILY };
96 KeyValue kvs[];
97
98
99 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
100
101 for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
102 for (int colIdx = 0; colIdx < 5; colIdx++) {
103
104 putNVersions(ht, FAMILY, rowIdx, colIdx, 201, 300);
105
106 putNVersions(ht, FAMILY, rowIdx, colIdx, 1, 100);
107 }
108 }
109
110
111 verifyInsertedValues(ht, FAMILY);
112
113 flush();
114
115
116 verifyInsertedValues(ht, FAMILY);
117
118
119
120 for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
121 for (int colIdx = 0; colIdx < 5; colIdx++) {
122 putNVersions(ht, FAMILY, rowIdx, colIdx, 301, 400);
123 putNVersions(ht, FAMILY, rowIdx, colIdx, 101, 200);
124 }
125 }
126
127 for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
128 for (int colIdx = 0; colIdx < 5; colIdx++) {
129 kvs = getNVersions(ht, FAMILY, rowIdx, colIdx,
130 Arrays.asList(505L, 5L, 105L, 305L, 205L));
131 assertEquals(4, kvs.length);
132 checkOneCell(kvs[0], FAMILY, rowIdx, colIdx, 305);
133 checkOneCell(kvs[1], FAMILY, rowIdx, colIdx, 205);
134 checkOneCell(kvs[2], FAMILY, rowIdx, colIdx, 105);
135 checkOneCell(kvs[3], FAMILY, rowIdx, colIdx, 5);
136 }
137 }
138
139
140
141 kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList<Long>());
142 assertEquals(0, kvs.length);
143
144
145
146
147
148 Result[] results = scanNVersions(ht, FAMILY, 0, 4,
149 Arrays.asList(6L, 106L, 306L));
150 assertEquals("# of rows returned from scan", 5, results.length);
151 for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
152 kvs = results[rowIdx].raw();
153
154
155 assertEquals("Number of KeyValues in result for row:" + rowIdx,
156 3*5, kvs.length);
157 for (int colIdx = 0; colIdx < 5; colIdx++) {
158 int offset = colIdx * 3;
159 checkOneCell(kvs[offset + 0], FAMILY, rowIdx, colIdx, 306);
160 checkOneCell(kvs[offset + 1], FAMILY, rowIdx, colIdx, 106);
161 checkOneCell(kvs[offset + 2], FAMILY, rowIdx, colIdx, 6);
162 }
163 }
164 }
165
166 @Test
167 public void testMultiColumns() throws Exception {
168 byte [] TABLE = Bytes.toBytes("testTimestampsFilterMultiColumns");
169 byte [] FAMILY = Bytes.toBytes("event_log");
170 byte [][] FAMILIES = new byte[][] { FAMILY };
171 KeyValue kvs[];
172
173
174 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
175
176 Put p = new Put(Bytes.toBytes("row"));
177 p.add(FAMILY, Bytes.toBytes("column0"), 3, Bytes.toBytes("value0-3"));
178 p.add(FAMILY, Bytes.toBytes("column1"), 3, Bytes.toBytes("value1-3"));
179 p.add(FAMILY, Bytes.toBytes("column2"), 1, Bytes.toBytes("value2-1"));
180 p.add(FAMILY, Bytes.toBytes("column2"), 2, Bytes.toBytes("value2-2"));
181 p.add(FAMILY, Bytes.toBytes("column2"), 3, Bytes.toBytes("value2-3"));
182 p.add(FAMILY, Bytes.toBytes("column3"), 2, Bytes.toBytes("value3-2"));
183 p.add(FAMILY, Bytes.toBytes("column4"), 1, Bytes.toBytes("value4-1"));
184 p.add(FAMILY, Bytes.toBytes("column4"), 2, Bytes.toBytes("value4-2"));
185 p.add(FAMILY, Bytes.toBytes("column4"), 3, Bytes.toBytes("value4-3"));
186 ht.put(p);
187
188 ArrayList timestamps = new ArrayList();
189 timestamps.add(new Long(3));
190 TimestampsFilter filter = new TimestampsFilter(timestamps);
191
192 Get g = new Get(Bytes.toBytes("row"));
193 g.setFilter(filter);
194 g.setMaxVersions();
195 g.addColumn(FAMILY, Bytes.toBytes("column2"));
196 g.addColumn(FAMILY, Bytes.toBytes("column4"));
197
198 Result result = ht.get(g);
199 for (KeyValue kv : result.list()) {
200 System.out.println("found row " + Bytes.toString(kv.getRow()) +
201 ", column " + Bytes.toString(kv.getQualifier()) + ", value "
202 + Bytes.toString(kv.getValue()));
203 }
204
205 assertEquals(result.list().size(), 2);
206 assertEquals(Bytes.toString(result.list().get(0).getValue()),
207 "value2-3");
208 assertEquals(Bytes.toString(result.list().get(1).getValue()),
209 "value4-3");
210 }
211
212
213
214
215
216
217 @Test
218 public void testWithVersionDeletes() throws Exception {
219
220
221 testWithVersionDeletes(false);
222
223
224 testWithVersionDeletes(true);
225 }
226
227 private void testWithVersionDeletes(boolean flushTables) throws IOException {
228 byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
229 (flushTables ? "flush" : "noflush"));
230 byte [] FAMILY = Bytes.toBytes("event_log");
231 byte [][] FAMILIES = new byte[][] { FAMILY };
232
233
234 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
235
236
237 putNVersions(ht, FAMILY, 0, 0, 1, 5);
238
239
240 deleteOneVersion(ht, FAMILY, 0, 0, 4);
241
242 if (flushTables) {
243 flush();
244 }
245
246
247
248 KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
249 assertEquals(3, kvs.length);
250 checkOneCell(kvs[0], FAMILY, 0, 0, 5);
251 checkOneCell(kvs[1], FAMILY, 0, 0, 3);
252 checkOneCell(kvs[2], FAMILY, 0, 0, 2);
253 }
254
255 private void verifyInsertedValues(HTable ht, byte[] cf) throws IOException {
256 for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
257 for (int colIdx = 0; colIdx < 5; colIdx++) {
258
259 KeyValue[] kvs = getNVersions(ht, cf, rowIdx, colIdx,
260 Arrays.asList(5L, 300L, 6L, 80L));
261 assertEquals(4, kvs.length);
262 checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
263 checkOneCell(kvs[1], cf, rowIdx, colIdx, 80);
264 checkOneCell(kvs[2], cf, rowIdx, colIdx, 6);
265 checkOneCell(kvs[3], cf, rowIdx, colIdx, 5);
266
267
268 kvs = getNVersions(ht, cf, rowIdx, colIdx,
269 Arrays.asList(101L, 102L));
270 assertEquals(0, kvs.length);
271
272
273 kvs = getNVersions(ht, cf, rowIdx, colIdx,
274 Arrays.asList(1L, 300L, 105L, 70L, 115L));
275 assertEquals(3, kvs.length);
276 checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
277 checkOneCell(kvs[1], cf, rowIdx, colIdx, 70);
278 checkOneCell(kvs[2], cf, rowIdx, colIdx, 1);
279 }
280 }
281 }
282
283
284 private void flush() throws IOException {
285 TEST_UTIL.flush();
286 try {
287 Thread.sleep(3000);
288 } catch (InterruptedException i) {
289
290 }
291 }
292
293
294
295
296
297 private void checkOneCell(KeyValue kv, byte[] cf,
298 int rowIdx, int colIdx, long ts) {
299
300 String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
301
302 assertEquals("Row mismatch which checking: " + ctx,
303 "row:"+ rowIdx, Bytes.toString(kv.getRow()));
304
305 assertEquals("ColumnFamily mismatch while checking: " + ctx,
306 Bytes.toString(cf), Bytes.toString(kv.getFamily()));
307
308 assertEquals("Column qualifier mismatch while checking: " + ctx,
309 "column:" + colIdx,
310 Bytes.toString(kv.getQualifier()));
311
312 assertEquals("Timestamp mismatch while checking: " + ctx,
313 ts, kv.getTimestamp());
314
315 assertEquals("Value mismatch while checking: " + ctx,
316 "value-version-" + ts, Bytes.toString(kv.getValue()));
317 }
318
319
320
321
322
323
324 private KeyValue[] getNVersions(HTable ht, byte[] cf, int rowIdx,
325 int colIdx, List<Long> versions)
326 throws IOException {
327 byte row[] = Bytes.toBytes("row:" + rowIdx);
328 byte column[] = Bytes.toBytes("column:" + colIdx);
329 Filter filter = new TimestampsFilter(versions);
330 Get get = new Get(row);
331 get.addColumn(cf, column);
332 get.setFilter(filter);
333 get.setMaxVersions();
334 Result result = ht.get(get);
335
336 return result.raw();
337 }
338
339
340
341
342
343 private Result[] scanNVersions(HTable ht, byte[] cf, int startRowIdx,
344 int endRowIdx, List<Long> versions)
345 throws IOException {
346 byte startRow[] = Bytes.toBytes("row:" + startRowIdx);
347 byte endRow[] = Bytes.toBytes("row:" + endRowIdx + 1);
348 Filter filter = new TimestampsFilter(versions);
349 Scan scan = new Scan(startRow, endRow);
350 scan.setFilter(filter);
351 scan.setMaxVersions();
352 ResultScanner scanner = ht.getScanner(scan);
353 return scanner.next(endRowIdx - startRowIdx + 1);
354 }
355
356
357
358
359
360 private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
361 long versionStart, long versionEnd)
362 throws IOException {
363 byte row[] = Bytes.toBytes("row:" + rowIdx);
364 byte column[] = Bytes.toBytes("column:" + colIdx);
365 Put put = new Put(row);
366
367 for (long idx = versionStart; idx <= versionEnd; idx++) {
368 put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
369 }
370
371 ht.put(put);
372 }
373
374
375
376
377
378 private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
379 int colIdx, long version)
380 throws IOException {
381 byte row[] = Bytes.toBytes("row:" + rowIdx);
382 byte column[] = Bytes.toBytes("column:" + colIdx);
383 Delete del = new Delete(row);
384 del.deleteColumn(cf, column, version);
385 ht.delete(del);
386 }
387 }
388