1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.hadoop.hbase.client;
21
22 import static org.junit.Assert.*;
23
24 import java.io.IOException;
25 import java.util.Arrays;
26 import java.util.Collections;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.hbase.HBaseTestingUtility;
32 import org.apache.hadoop.hbase.KeyValue;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.junit.After;
35 import org.junit.AfterClass;
36 import org.junit.Before;
37 import org.junit.BeforeClass;
38 import org.junit.Test;
39
40
41
42
43
44
45 public class TestMultipleTimestamps {
46 final Log LOG = LogFactory.getLog(getClass());
47 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
48
49
50
51
52 @BeforeClass
53 public static void setUpBeforeClass() throws Exception {
54 TEST_UTIL.startMiniCluster(3);
55 }
56
57
58
59
60 @AfterClass
61 public static void tearDownAfterClass() throws Exception {
62 TEST_UTIL.shutdownMiniCluster();
63 }
64
65
66
67
68 @Before
69 public void setUp() throws Exception {
70
71 }
72
73
74
75
76 @After
77 public void tearDown() throws Exception {
78
79 }
80
81 @Test
82 public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException {
83 byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
84 "ColumnMiltipleTimestamps");
85 byte [] FAMILY = Bytes.toBytes("event_log");
86 byte [][] FAMILIES = new byte[][] { FAMILY };
87
88
89 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
90
91 Integer[] putRows = new Integer[] {1, 3, 5, 7};
92 Integer[] putColumns = new Integer[] { 1, 3, 5};
93 Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
94
95 Integer[] scanRows = new Integer[] {3, 5};
96 Integer[] scanColumns = new Integer[] {3};
97 Long[] scanTimestamps = new Long[] {3L, 4L};
98 int scanMaxVersions = 2;
99
100 put(ht, FAMILY, putRows, putColumns, putTimestamps);
101
102 flush(TABLE);
103
104 ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
105 scanTimestamps, scanMaxVersions);
106
107 KeyValue[] kvs;
108
109 kvs = scanner.next().raw();
110 assertEquals(2, kvs.length);
111 checkOneCell(kvs[0], FAMILY, 3, 3, 4);
112 checkOneCell(kvs[1], FAMILY, 3, 3, 3);
113 kvs = scanner.next().raw();
114 assertEquals(2, kvs.length);
115 checkOneCell(kvs[0], FAMILY, 5, 3, 4);
116 checkOneCell(kvs[1], FAMILY, 5, 3, 3);
117 }
118
119 @Test
120 public void testReseeksWithMultipleColumnOneTimestamp() throws IOException {
121 byte [] TABLE = Bytes.toBytes("testReseeksWithMultiple" +
122 "ColumnOneTimestamps");
123 byte [] FAMILY = Bytes.toBytes("event_log");
124 byte [][] FAMILIES = new byte[][] { FAMILY };
125
126
127 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
128
129 Integer[] putRows = new Integer[] {1, 3, 5, 7};
130 Integer[] putColumns = new Integer[] { 1, 3, 5};
131 Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
132
133 Integer[] scanRows = new Integer[] {3, 5};
134 Integer[] scanColumns = new Integer[] {3,4};
135 Long[] scanTimestamps = new Long[] {3L};
136 int scanMaxVersions = 2;
137
138 put(ht, FAMILY, putRows, putColumns, putTimestamps);
139
140 flush(TABLE);
141
142 ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
143 scanTimestamps, scanMaxVersions);
144
145 KeyValue[] kvs;
146
147 kvs = scanner.next().raw();
148 assertEquals(1, kvs.length);
149 checkOneCell(kvs[0], FAMILY, 3, 3, 3);
150 kvs = scanner.next().raw();
151 assertEquals(1, kvs.length);
152 checkOneCell(kvs[0], FAMILY, 5, 3, 3);
153 }
154
155 @Test
156 public void testReseeksWithMultipleColumnMultipleTimestamp() throws
157 IOException {
158 byte [] TABLE = Bytes.toBytes("testReseeksWithMultiple" +
159 "ColumnMiltipleTimestamps");
160 byte [] FAMILY = Bytes.toBytes("event_log");
161 byte [][] FAMILIES = new byte[][] { FAMILY };
162
163
164 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
165
166 Integer[] putRows = new Integer[] {1, 3, 5, 7};
167 Integer[] putColumns = new Integer[] { 1, 3, 5};
168 Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
169
170 Integer[] scanRows = new Integer[] {5, 7};
171 Integer[] scanColumns = new Integer[] {3, 4, 5};
172 Long[] scanTimestamps = new Long[] {2l, 3L};
173 int scanMaxVersions = 2;
174
175 put(ht, FAMILY, putRows, putColumns, putTimestamps);
176
177 flush(TABLE);
178
179 ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
180 scanTimestamps, scanMaxVersions);
181
182 KeyValue[] kvs;
183
184 kvs = scanner.next().raw();
185 assertEquals(4, kvs.length);
186 checkOneCell(kvs[0], FAMILY, 5, 3, 3);
187 checkOneCell(kvs[1], FAMILY, 5, 3, 2);
188 checkOneCell(kvs[2], FAMILY, 5, 5, 3);
189 checkOneCell(kvs[3], FAMILY, 5, 5, 2);
190 kvs = scanner.next().raw();
191 assertEquals(4, kvs.length);
192 checkOneCell(kvs[0], FAMILY, 7, 3, 3);
193 checkOneCell(kvs[1], FAMILY, 7, 3, 2);
194 checkOneCell(kvs[2], FAMILY, 7, 5, 3);
195 checkOneCell(kvs[3], FAMILY, 7, 5, 2);
196 }
197
198 @Test
199 public void testReseeksWithMultipleFiles() throws IOException {
200 byte [] TABLE = Bytes.toBytes("testReseeksWithMultipleFiles");
201 byte [] FAMILY = Bytes.toBytes("event_log");
202 byte [][] FAMILIES = new byte[][] { FAMILY };
203
204
205 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
206
207 Integer[] putRows1 = new Integer[] {1, 2, 3};
208 Integer[] putColumns1 = new Integer[] { 2, 5, 6};
209 Long[] putTimestamps1 = new Long[] {1L, 2L, 5L};
210
211 Integer[] putRows2 = new Integer[] {6, 7};
212 Integer[] putColumns2 = new Integer[] {3, 6};
213 Long[] putTimestamps2 = new Long[] {4L, 5L};
214
215 Integer[] putRows3 = new Integer[] {2, 3, 5};
216 Integer[] putColumns3 = new Integer[] {1, 2, 3};
217 Long[] putTimestamps3 = new Long[] {4L,8L};
218
219
220 Integer[] scanRows = new Integer[] {3, 5, 7};
221 Integer[] scanColumns = new Integer[] {3, 4, 5};
222 Long[] scanTimestamps = new Long[] {2l, 4L};
223 int scanMaxVersions = 5;
224
225 put(ht, FAMILY, putRows1, putColumns1, putTimestamps1);
226 flush(TABLE);
227 put(ht, FAMILY, putRows2, putColumns2, putTimestamps2);
228 flush(TABLE);
229 put(ht, FAMILY, putRows3, putColumns3, putTimestamps3);
230
231 ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
232 scanTimestamps, scanMaxVersions);
233
234 KeyValue[] kvs;
235
236 kvs = scanner.next().raw();
237 assertEquals(2, kvs.length);
238 checkOneCell(kvs[0], FAMILY, 3, 3, 4);
239 checkOneCell(kvs[1], FAMILY, 3, 5, 2);
240
241 kvs = scanner.next().raw();
242 assertEquals(1, kvs.length);
243 checkOneCell(kvs[0], FAMILY, 5, 3, 4);
244
245 kvs = scanner.next().raw();
246 assertEquals(1, kvs.length);
247 checkOneCell(kvs[0], FAMILY, 6, 3, 4);
248
249 kvs = scanner.next().raw();
250 assertEquals(1, kvs.length);
251 checkOneCell(kvs[0], FAMILY, 7, 3, 4);
252 }
253
254 @Test
255 public void testWithVersionDeletes() throws Exception {
256
257
258 testWithVersionDeletes(false);
259
260
261 testWithVersionDeletes(true);
262 }
263
264 public void testWithVersionDeletes(boolean flushTables) throws IOException {
265 byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
266 (flushTables ? "flush" : "noflush"));
267 byte [] FAMILY = Bytes.toBytes("event_log");
268 byte [][] FAMILIES = new byte[][] { FAMILY };
269
270
271 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
272
273
274 putNVersions(ht, FAMILY, 0, 0, 1, 5);
275
276 if (flushTables) {
277 flush(TABLE);
278 }
279
280
281 deleteOneVersion(ht, FAMILY, 0, 0, 4);
282
283
284
285 KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0,
286 Arrays.asList(2L, 3L, 4L, 5L));
287 assertEquals(3, kvs.length);
288 checkOneCell(kvs[0], FAMILY, 0, 0, 5);
289 checkOneCell(kvs[1], FAMILY, 0, 0, 3);
290 checkOneCell(kvs[2], FAMILY, 0, 0, 2);
291 }
292
293 @Test
294 public void testWithMultipleVersionDeletes() throws IOException {
295 byte [] TABLE = Bytes.toBytes("testWithMultipleVersionDeletes");
296 byte [] FAMILY = Bytes.toBytes("event_log");
297 byte [][] FAMILIES = new byte[][] { FAMILY };
298
299
300 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
301
302
303 putNVersions(ht, FAMILY, 0, 0, 1, 5);
304
305 flush(TABLE);
306
307
308 deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4);
309
310
311
312 KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
313 assertEquals(0, kvs.length);
314 }
315
316 @Test
317 public void testWithColumnDeletes() throws IOException {
318 byte [] TABLE = Bytes.toBytes("testWithColumnDeletes");
319 byte [] FAMILY = Bytes.toBytes("event_log");
320 byte [][] FAMILIES = new byte[][] { FAMILY };
321
322
323 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
324
325
326 putNVersions(ht, FAMILY, 0, 0, 1, 5);
327
328 flush(TABLE);
329
330
331 deleteColumn(ht, FAMILY, 0, 0);
332
333
334
335 KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
336 assertEquals(0, kvs.length);
337 }
338
339 @Test
340 public void testWithFamilyDeletes() throws IOException {
341 byte [] TABLE = Bytes.toBytes("testWithFamilyDeletes");
342 byte [] FAMILY = Bytes.toBytes("event_log");
343 byte [][] FAMILIES = new byte[][] { FAMILY };
344
345
346 HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
347
348
349 putNVersions(ht, FAMILY, 0, 0, 1, 5);
350
351 flush(TABLE);
352
353
354 deleteFamily(ht, FAMILY, 0);
355
356
357
358 KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
359 assertEquals(0, kvs.length);
360 }
361
362
363 private void flush(byte [] tableName) throws IOException {
364 TEST_UTIL.flush(tableName);
365 try {
366 Thread.sleep(3000);
367 } catch (InterruptedException i) {
368
369 }
370 }
371
372
373
374
375
376 private void checkOneCell(KeyValue kv, byte[] cf,
377 int rowIdx, int colIdx, long ts) {
378
379 String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
380
381 assertEquals("Row mismatch which checking: " + ctx,
382 "row:"+ rowIdx, Bytes.toString(kv.getRow()));
383
384 assertEquals("ColumnFamily mismatch while checking: " + ctx,
385 Bytes.toString(cf), Bytes.toString(kv.getFamily()));
386
387 assertEquals("Column qualifier mismatch while checking: " + ctx,
388 "column:" + colIdx,
389 Bytes.toString(kv.getQualifier()));
390
391 assertEquals("Timestamp mismatch while checking: " + ctx,
392 ts, kv.getTimestamp());
393
394 assertEquals("Value mismatch while checking: " + ctx,
395 "value-version-" + ts, Bytes.toString(kv.getValue()));
396 }
397
398
399
400
401
402
403 private KeyValue[] getNVersions(HTable ht, byte[] cf, int rowIdx,
404 int colIdx, List<Long> versions)
405 throws IOException {
406 byte row[] = Bytes.toBytes("row:" + rowIdx);
407 byte column[] = Bytes.toBytes("column:" + colIdx);
408 Get get = new Get(row);
409 get.addColumn(cf, column);
410 get.setMaxVersions();
411 get.setTimeRange(Collections.min(versions), Collections.max(versions)+1);
412 Result result = ht.get(get);
413
414 return result.raw();
415 }
416
417 private ResultScanner scan(HTable ht, byte[] cf,
418 Integer[] rowIndexes, Integer[] columnIndexes,
419 Long[] versions, int maxVersions)
420 throws IOException {
421 Arrays.asList(rowIndexes);
422 byte startRow[] = Bytes.toBytes("row:" +
423 Collections.min( Arrays.asList(rowIndexes)));
424 byte endRow[] = Bytes.toBytes("row:" +
425 Collections.max( Arrays.asList(rowIndexes))+1);
426 Scan scan = new Scan(startRow, endRow);
427 for (Integer colIdx: columnIndexes) {
428 byte column[] = Bytes.toBytes("column:" + colIdx);
429 scan.addColumn(cf, column);
430 }
431 scan.setMaxVersions(maxVersions);
432 scan.setTimeRange(Collections.min(Arrays.asList(versions)),
433 Collections.max(Arrays.asList(versions))+1);
434 ResultScanner scanner = ht.getScanner(scan);
435 return scanner;
436 }
437
438 private void put(HTable ht, byte[] cf, Integer[] rowIndexes,
439 Integer[] columnIndexes, Long[] versions)
440 throws IOException {
441 for (int rowIdx: rowIndexes) {
442 byte row[] = Bytes.toBytes("row:" + rowIdx);
443 Put put = new Put(row);
444 for(int colIdx: columnIndexes) {
445 byte column[] = Bytes.toBytes("column:" + colIdx);
446 for (long version: versions) {
447 put.add(cf, column, version, Bytes.toBytes("value-version-" +
448 version));
449 }
450 }
451 ht.put(put);
452 }
453 }
454
455
456
457
458
459 private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
460 long versionStart, long versionEnd)
461 throws IOException {
462 byte row[] = Bytes.toBytes("row:" + rowIdx);
463 byte column[] = Bytes.toBytes("column:" + colIdx);
464 Put put = new Put(row);
465
466 for (long idx = versionStart; idx <= versionEnd; idx++) {
467 put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
468 }
469
470 ht.put(put);
471 }
472
473
474
475
476
477 private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
478 int colIdx, long version)
479 throws IOException {
480 byte row[] = Bytes.toBytes("row:" + rowIdx);
481 byte column[] = Bytes.toBytes("column:" + colIdx);
482 Delete del = new Delete(row);
483 del.deleteColumn(cf, column, version);
484 ht.delete(del);
485 }
486
487
488
489
490
491 private void deleteAllVersionsBefore(HTable ht, byte[] cf, int rowIdx,
492 int colIdx, long version)
493 throws IOException {
494 byte row[] = Bytes.toBytes("row:" + rowIdx);
495 byte column[] = Bytes.toBytes("column:" + colIdx);
496 Delete del = new Delete(row);
497 del.deleteColumns(cf, column, version);
498 ht.delete(del);
499 }
500
501 private void deleteColumn(HTable ht, byte[] cf, int rowIdx, int colIdx) throws IOException {
502 byte row[] = Bytes.toBytes("row:" + rowIdx);
503 byte column[] = Bytes.toBytes("column:" + colIdx);
504 Delete del = new Delete(row);
505 del.deleteColumns(cf, column);
506 ht.delete(del);
507 }
508
509 private void deleteFamily(HTable ht, byte[] cf, int rowIdx) throws IOException {
510 byte row[] = Bytes.toBytes("row:" + rowIdx);
511 Delete del = new Delete(row);
512 del.deleteFamily(cf);
513 ht.delete(del);
514 }
515 }
516