1   /**
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.client;
21  
22  import static org.junit.Assert.*;
23  
24  import java.io.IOException;
25  import java.util.Arrays;
26  import java.util.Collections;
27  import java.util.List;
28  
29  import org.apache.commons.logging.Log;
30  import org.apache.commons.logging.LogFactory;
31  import org.apache.hadoop.hbase.HBaseTestingUtility;
32  import org.apache.hadoop.hbase.KeyValue;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.junit.After;
35  import org.junit.AfterClass;
36  import org.junit.Before;
37  import org.junit.BeforeClass;
38  import org.junit.Test;
39  
40  /**
41   * Run tests related to {@link TimestampsFilter} using HBase client APIs.
42   * Sets up the HBase mini cluster once at start. Each creates a table
43   * named for the method and does its stuff against that.
44   */
45  public class TestMultipleTimestamps {
46    final Log LOG = LogFactory.getLog(getClass());
47    private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
48  
49    /**
50     * @throws java.lang.Exception
51     */
52    @BeforeClass
53    public static void setUpBeforeClass() throws Exception {
54      TEST_UTIL.startMiniCluster(3);
55    }
56  
57    /**
58     * @throws java.lang.Exception
59     */
60    @AfterClass
61    public static void tearDownAfterClass() throws Exception {
62      TEST_UTIL.shutdownMiniCluster();
63    }
64  
65    /**
66     * @throws java.lang.Exception
67     */
68    @Before
69    public void setUp() throws Exception {
70      // Nothing to do.
71    }
72  
73    /**
74     * @throws java.lang.Exception
75     */
76    @After
77    public void tearDown() throws Exception {
78      // Nothing to do.
79    }
80  
81    @Test
82    public void testReseeksWithOneColumnMiltipleTimestamp() throws IOException {
83      byte [] TABLE = Bytes.toBytes("testReseeksWithOne" +
84      "ColumnMiltipleTimestamps");
85      byte [] FAMILY = Bytes.toBytes("event_log");
86      byte [][] FAMILIES = new byte[][] { FAMILY };
87  
88      // create table; set versions to max...
89      HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
90  
91      Integer[] putRows = new Integer[] {1, 3, 5, 7};
92      Integer[] putColumns = new Integer[] { 1, 3, 5};
93      Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
94  
95      Integer[] scanRows = new Integer[] {3, 5};
96      Integer[] scanColumns = new Integer[] {3};
97      Long[] scanTimestamps = new Long[] {3L, 4L};
98      int scanMaxVersions = 2;
99  
100     put(ht, FAMILY, putRows, putColumns, putTimestamps);
101 
102     flush(TABLE);
103 
104     ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
105         scanTimestamps, scanMaxVersions);
106 
107     KeyValue[] kvs;
108 
109     kvs = scanner.next().raw();
110     assertEquals(2, kvs.length);
111     checkOneCell(kvs[0], FAMILY, 3, 3, 4);
112     checkOneCell(kvs[1], FAMILY, 3, 3, 3);
113     kvs = scanner.next().raw();
114     assertEquals(2, kvs.length);
115     checkOneCell(kvs[0], FAMILY, 5, 3, 4);
116     checkOneCell(kvs[1], FAMILY, 5, 3, 3);
117   }
118 
119   @Test
120   public void testReseeksWithMultipleColumnOneTimestamp() throws IOException {
121     byte [] TABLE = Bytes.toBytes("testReseeksWithMultiple" +
122     "ColumnOneTimestamps");
123     byte [] FAMILY = Bytes.toBytes("event_log");
124     byte [][] FAMILIES = new byte[][] { FAMILY };
125 
126     // create table; set versions to max...
127     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
128 
129     Integer[] putRows = new Integer[] {1, 3, 5, 7};
130     Integer[] putColumns = new Integer[] { 1, 3, 5};
131     Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
132 
133     Integer[] scanRows = new Integer[] {3, 5};
134     Integer[] scanColumns = new Integer[] {3,4};
135     Long[] scanTimestamps = new Long[] {3L};
136     int scanMaxVersions = 2;
137 
138     put(ht, FAMILY, putRows, putColumns, putTimestamps);
139 
140     flush(TABLE);
141 
142     ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
143         scanTimestamps, scanMaxVersions);
144 
145     KeyValue[] kvs;
146 
147     kvs = scanner.next().raw();
148     assertEquals(1, kvs.length);
149     checkOneCell(kvs[0], FAMILY, 3, 3, 3);
150     kvs = scanner.next().raw();
151     assertEquals(1, kvs.length);
152     checkOneCell(kvs[0], FAMILY, 5, 3, 3);
153   }
154 
155   @Test
156   public void testReseeksWithMultipleColumnMultipleTimestamp() throws
157   IOException {
158     byte [] TABLE = Bytes.toBytes("testReseeksWithMultiple" +
159     "ColumnMiltipleTimestamps");
160     byte [] FAMILY = Bytes.toBytes("event_log");
161     byte [][] FAMILIES = new byte[][] { FAMILY };
162 
163     // create table; set versions to max...
164     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
165 
166     Integer[] putRows = new Integer[] {1, 3, 5, 7};
167     Integer[] putColumns = new Integer[] { 1, 3, 5};
168     Long[] putTimestamps = new Long[] {1L, 2L, 3L, 4L, 5L};
169 
170     Integer[] scanRows = new Integer[] {5, 7};
171     Integer[] scanColumns = new Integer[] {3, 4, 5};
172     Long[] scanTimestamps = new Long[] {2l, 3L};
173     int scanMaxVersions = 2;
174 
175     put(ht, FAMILY, putRows, putColumns, putTimestamps);
176 
177     flush(TABLE);
178 
179     ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
180         scanTimestamps, scanMaxVersions);
181 
182     KeyValue[] kvs;
183 
184     kvs = scanner.next().raw();
185     assertEquals(4, kvs.length);
186     checkOneCell(kvs[0], FAMILY, 5, 3, 3);
187     checkOneCell(kvs[1], FAMILY, 5, 3, 2);
188     checkOneCell(kvs[2], FAMILY, 5, 5, 3);
189     checkOneCell(kvs[3], FAMILY, 5, 5, 2);
190     kvs = scanner.next().raw();
191     assertEquals(4, kvs.length);
192     checkOneCell(kvs[0], FAMILY, 7, 3, 3);
193     checkOneCell(kvs[1], FAMILY, 7, 3, 2);
194     checkOneCell(kvs[2], FAMILY, 7, 5, 3);
195     checkOneCell(kvs[3], FAMILY, 7, 5, 2);
196   }
197 
198   @Test
199   public void testReseeksWithMultipleFiles() throws IOException {
200     byte [] TABLE = Bytes.toBytes("testReseeksWithMultipleFiles");
201     byte [] FAMILY = Bytes.toBytes("event_log");
202     byte [][] FAMILIES = new byte[][] { FAMILY };
203 
204     // create table; set versions to max...
205     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
206 
207     Integer[] putRows1 = new Integer[] {1, 2, 3};
208     Integer[] putColumns1 = new Integer[] { 2, 5, 6};
209     Long[] putTimestamps1 = new Long[] {1L, 2L, 5L};
210 
211     Integer[] putRows2 = new Integer[] {6, 7};
212     Integer[] putColumns2 = new Integer[] {3, 6};
213     Long[] putTimestamps2 = new Long[] {4L, 5L};
214 
215     Integer[] putRows3 = new Integer[] {2, 3, 5};
216     Integer[] putColumns3 = new Integer[] {1, 2, 3};
217     Long[] putTimestamps3 = new Long[] {4L,8L};
218 
219 
220     Integer[] scanRows = new Integer[] {3, 5, 7};
221     Integer[] scanColumns = new Integer[] {3, 4, 5};
222     Long[] scanTimestamps = new Long[] {2l, 4L};
223     int scanMaxVersions = 5;
224 
225     put(ht, FAMILY, putRows1, putColumns1, putTimestamps1);
226     flush(TABLE);
227     put(ht, FAMILY, putRows2, putColumns2, putTimestamps2);
228     flush(TABLE);
229     put(ht, FAMILY, putRows3, putColumns3, putTimestamps3);
230 
231     ResultScanner scanner = scan(ht, FAMILY, scanRows, scanColumns,
232         scanTimestamps, scanMaxVersions);
233 
234     KeyValue[] kvs;
235 
236     kvs = scanner.next().raw();
237     assertEquals(2, kvs.length);
238     checkOneCell(kvs[0], FAMILY, 3, 3, 4);
239     checkOneCell(kvs[1], FAMILY, 3, 5, 2);
240 
241     kvs = scanner.next().raw();
242     assertEquals(1, kvs.length);
243     checkOneCell(kvs[0], FAMILY, 5, 3, 4);
244 
245     kvs = scanner.next().raw();
246     assertEquals(1, kvs.length);
247     checkOneCell(kvs[0], FAMILY, 6, 3, 4);
248 
249     kvs = scanner.next().raw();
250     assertEquals(1, kvs.length);
251     checkOneCell(kvs[0], FAMILY, 7, 3, 4);
252   }
253 
254   @Test
255   public void testWithVersionDeletes() throws Exception {
256 
257     // first test from memstore (without flushing).
258     testWithVersionDeletes(false);
259 
260     // run same test against HFiles (by forcing a flush).
261     testWithVersionDeletes(true);
262   }
263 
264   public void testWithVersionDeletes(boolean flushTables) throws IOException {
265     byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
266         (flushTables ? "flush" : "noflush"));
267     byte [] FAMILY = Bytes.toBytes("event_log");
268     byte [][] FAMILIES = new byte[][] { FAMILY };
269 
270     // create table; set versions to max...
271     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
272 
273     // For row:0, col:0: insert versions 1 through 5.
274     putNVersions(ht, FAMILY, 0, 0, 1, 5);
275 
276     if (flushTables) {
277       flush(TABLE);
278     }
279 
280     // delete version 4.
281     deleteOneVersion(ht, FAMILY, 0, 0, 4);
282 
283     // request a bunch of versions including the deleted version. We should
284     // only get back entries for the versions that exist.
285     KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0,
286         Arrays.asList(2L, 3L, 4L, 5L));
287     assertEquals(3, kvs.length);
288     checkOneCell(kvs[0], FAMILY, 0, 0, 5);
289     checkOneCell(kvs[1], FAMILY, 0, 0, 3);
290     checkOneCell(kvs[2], FAMILY, 0, 0, 2);
291   }
292 
293   @Test
294   public void testWithMultipleVersionDeletes() throws IOException {
295     byte [] TABLE = Bytes.toBytes("testWithMultipleVersionDeletes");
296     byte [] FAMILY = Bytes.toBytes("event_log");
297     byte [][] FAMILIES = new byte[][] { FAMILY };
298 
299     // create table; set versions to max...
300     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
301 
302     // For row:0, col:0: insert versions 1 through 5.
303     putNVersions(ht, FAMILY, 0, 0, 1, 5);
304 
305     flush(TABLE);
306 
307     // delete all versions before 4.
308     deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4);
309 
310     // request a bunch of versions including the deleted version. We should
311     // only get back entries for the versions that exist.
312     KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
313     assertEquals(0, kvs.length);
314   }
315 
316   @Test
317   public void testWithColumnDeletes() throws IOException {
318     byte [] TABLE = Bytes.toBytes("testWithColumnDeletes");
319     byte [] FAMILY = Bytes.toBytes("event_log");
320     byte [][] FAMILIES = new byte[][] { FAMILY };
321 
322     // create table; set versions to max...
323     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
324 
325     // For row:0, col:0: insert versions 1 through 5.
326     putNVersions(ht, FAMILY, 0, 0, 1, 5);
327 
328     flush(TABLE);
329 
330     // delete all versions before 4.
331     deleteColumn(ht, FAMILY, 0, 0);
332 
333     // request a bunch of versions including the deleted version. We should
334     // only get back entries for the versions that exist.
335     KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
336     assertEquals(0, kvs.length);
337   }
338 
339   @Test
340   public void testWithFamilyDeletes() throws IOException {
341     byte [] TABLE = Bytes.toBytes("testWithFamilyDeletes");
342     byte [] FAMILY = Bytes.toBytes("event_log");
343     byte [][] FAMILIES = new byte[][] { FAMILY };
344 
345     // create table; set versions to max...
346     HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
347 
348     // For row:0, col:0: insert versions 1 through 5.
349     putNVersions(ht, FAMILY, 0, 0, 1, 5);
350 
351     flush(TABLE);
352 
353     // delete all versions before 4.
354     deleteFamily(ht, FAMILY, 0);
355 
356     // request a bunch of versions including the deleted version. We should
357     // only get back entries for the versions that exist.
358     KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
359     assertEquals(0, kvs.length);
360   }
361 
362   // Flush tables. Since flushing is asynchronous, sleep for a bit.
363   private void flush(byte [] tableName) throws IOException {
364     TEST_UTIL.flush(tableName);
365     try {
366       Thread.sleep(3000);
367     } catch (InterruptedException i) {
368       // ignore
369     }
370   }
371 
372   /**
373    * Assert that the passed in KeyValue has expected contents for the
374    * specified row, column & timestamp.
375    */
376   private void checkOneCell(KeyValue kv, byte[] cf,
377       int rowIdx, int colIdx, long ts) {
378 
379     String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
380 
381     assertEquals("Row mismatch which checking: " + ctx,
382         "row:"+ rowIdx, Bytes.toString(kv.getRow()));
383 
384     assertEquals("ColumnFamily mismatch while checking: " + ctx,
385         Bytes.toString(cf), Bytes.toString(kv.getFamily()));
386 
387     assertEquals("Column qualifier mismatch while checking: " + ctx,
388         "column:" + colIdx,
389         Bytes.toString(kv.getQualifier()));
390 
391     assertEquals("Timestamp mismatch while checking: " + ctx,
392         ts, kv.getTimestamp());
393 
394     assertEquals("Value mismatch while checking: " + ctx,
395         "value-version-" + ts, Bytes.toString(kv.getValue()));
396   }
397 
398   /**
399    * Uses the TimestampFilter on a Get to request a specified list of
400    * versions for the row/column specified by rowIdx & colIdx.
401    *
402    */
403   private  KeyValue[] getNVersions(HTable ht, byte[] cf, int rowIdx,
404       int colIdx, List<Long> versions)
405   throws IOException {
406     byte row[] = Bytes.toBytes("row:" + rowIdx);
407     byte column[] = Bytes.toBytes("column:" + colIdx);
408     Get get = new Get(row);
409     get.addColumn(cf, column);
410     get.setMaxVersions();
411     get.setTimeRange(Collections.min(versions), Collections.max(versions)+1);
412     Result result = ht.get(get);
413 
414     return result.raw();
415   }
416 
417   private  ResultScanner scan(HTable ht, byte[] cf,
418       Integer[] rowIndexes, Integer[] columnIndexes,
419       Long[] versions, int maxVersions)
420   throws IOException {
421     Arrays.asList(rowIndexes);
422     byte startRow[] = Bytes.toBytes("row:" +
423         Collections.min( Arrays.asList(rowIndexes)));
424     byte endRow[] = Bytes.toBytes("row:" +
425         Collections.max( Arrays.asList(rowIndexes))+1);
426     Scan scan = new Scan(startRow, endRow);
427     for (Integer colIdx: columnIndexes) {
428       byte column[] = Bytes.toBytes("column:" + colIdx);
429       scan.addColumn(cf, column);
430     }
431     scan.setMaxVersions(maxVersions);
432     scan.setTimeRange(Collections.min(Arrays.asList(versions)),
433         Collections.max(Arrays.asList(versions))+1);
434     ResultScanner scanner = ht.getScanner(scan);
435     return scanner;
436   }
437 
438   private void put(HTable ht, byte[] cf, Integer[] rowIndexes,
439       Integer[] columnIndexes, Long[] versions)
440   throws IOException {
441     for (int rowIdx: rowIndexes) {
442       byte row[] = Bytes.toBytes("row:" + rowIdx);
443       Put put = new Put(row);
444       for(int colIdx: columnIndexes) {
445         byte column[] = Bytes.toBytes("column:" + colIdx);
446         for (long version: versions) {
447           put.add(cf, column, version, Bytes.toBytes("value-version-" +
448               version));
449         }
450       }
451       ht.put(put);
452     }
453   }
454 
455   /**
456    * Insert in specific row/column versions with timestamps
457    * versionStart..versionEnd.
458    */
459   private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
460       long versionStart, long versionEnd)
461   throws IOException {
462     byte row[] = Bytes.toBytes("row:" + rowIdx);
463     byte column[] = Bytes.toBytes("column:" + colIdx);
464     Put put = new Put(row);
465 
466     for (long idx = versionStart; idx <= versionEnd; idx++) {
467       put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
468     }
469 
470     ht.put(put);
471   }
472 
473   /**
474    * For row/column specified by rowIdx/colIdx, delete the cell
475    * corresponding to the specified version.
476    */
477   private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
478       int colIdx, long version)
479   throws IOException {
480     byte row[] = Bytes.toBytes("row:" + rowIdx);
481     byte column[] = Bytes.toBytes("column:" + colIdx);
482     Delete del = new Delete(row);
483     del.deleteColumn(cf, column, version);
484     ht.delete(del);
485   }
486 
487   /**
488    * For row/column specified by rowIdx/colIdx, delete all cells
489    * preceeding the specified version.
490    */
491   private void deleteAllVersionsBefore(HTable ht, byte[] cf, int rowIdx,
492       int colIdx, long version)
493   throws IOException {
494     byte row[] = Bytes.toBytes("row:" + rowIdx);
495     byte column[] = Bytes.toBytes("column:" + colIdx);
496     Delete del = new Delete(row);
497     del.deleteColumns(cf, column, version);
498     ht.delete(del);
499   }
500 
501   private void deleteColumn(HTable ht, byte[] cf, int rowIdx, int colIdx) throws IOException {
502     byte row[] = Bytes.toBytes("row:" + rowIdx);
503     byte column[] = Bytes.toBytes("column:" + colIdx);
504     Delete del = new Delete(row);
505     del.deleteColumns(cf, column);
506     ht.delete(del);
507   }
508 
509   private void deleteFamily(HTable ht, byte[] cf, int rowIdx) throws IOException {
510     byte row[] = Bytes.toBytes("row:" + rowIdx);
511     Delete del = new Delete(row);
512     del.deleteFamily(cf);
513     ht.delete(del);
514   }
515 }
516