1   /**
2    * Copyright 2007 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.Collections;
27  import java.util.Comparator;
28  import java.util.List;
29  import java.util.TreeSet;
30  
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.fs.FileSystem;
34  import org.apache.hadoop.fs.Path;
35  import org.apache.hadoop.hbase.HBaseTestCase;
36  import org.apache.hadoop.hbase.HBaseTestingUtility;
37  import org.apache.hadoop.hbase.HConstants;
38  import org.apache.hadoop.hbase.KeyValue;
39  import org.apache.hadoop.hbase.client.Scan;
40  import org.apache.hadoop.hbase.io.Reference.Range;
41  import org.apache.hadoop.hbase.io.hfile.HFile;
42  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
43  import org.apache.hadoop.hbase.util.ByteBloomFilter;
44  import org.apache.hadoop.hbase.util.Bytes;
45  import org.apache.hadoop.hbase.util.Hash;
46  import org.apache.hadoop.hdfs.MiniDFSCluster;
47  import org.mockito.Mockito;
48  
49  import com.google.common.base.Joiner;
50  import com.google.common.collect.Collections2;
51  import com.google.common.collect.Iterables;
52  import com.google.common.collect.Lists;
53  
54  /**
55   * Test HStoreFile
56   */
57  public class TestStoreFile extends HBaseTestCase {
58    static final Log LOG = LogFactory.getLog(TestStoreFile.class);
59    private MiniDFSCluster cluster;
60  
61    @Override
62    public void setUp() throws Exception {
63      try {
64        this.cluster = new MiniDFSCluster(this.conf, 2, true, (String[])null);
65        // Set the hbase.rootdir to be the home directory in mini dfs.
66        this.conf.set(HConstants.HBASE_DIR,
67          this.cluster.getFileSystem().getHomeDirectory().toString());
68      } catch (IOException e) {
69        shutdownDfs(cluster);
70      }
71      super.setUp();
72    }
73  
74    @Override
75    public void tearDown() throws Exception {
76      super.tearDown();
77      shutdownDfs(cluster);
78      // ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
79      //  "Temporary end-of-test thread dump debugging HADOOP-2040: " + getName());
80    }
81  
82    /**
83     * Write a file and then assert that we can read from top and bottom halves
84     * using two HalfMapFiles.
85     * @throws Exception
86     */
87    public void testBasicHalfMapFile() throws Exception {
88      // Make up a directory hierarchy that has a regiondir and familyname.
89      StoreFile.Writer writer = StoreFile.createWriter(this.fs,
90        new Path(new Path(this.testDir, "regionname"), "familyname"), 2 * 1024);
91      writeStoreFile(writer);
92      checkHalfHFile(new StoreFile(this.fs, writer.getPath(), true, conf,
93          StoreFile.BloomType.NONE, false));
94    }
95  
96    private void writeStoreFile(final StoreFile.Writer writer) throws IOException {
97      writeStoreFile(writer, Bytes.toBytes(getName()), Bytes.toBytes(getName()));
98    }
99    /*
100    * Writes HStoreKey and ImmutableBytes data to passed writer and
101    * then closes it.
102    * @param writer
103    * @throws IOException
104    */
105   public static void writeStoreFile(final StoreFile.Writer writer, byte[] fam, byte[] qualifier)
106   throws IOException {
107     long now = System.currentTimeMillis();
108     try {
109       for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
110         for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
111           byte[] b = new byte[] { (byte) d, (byte) e };
112           writer.append(new KeyValue(b, fam, qualifier, now, b));
113         }
114       }
115     } finally {
116       writer.close();
117     }
118   }
119 
120   /**
121    * Test that our mechanism of writing store files in one region to reference
122    * store files in other regions works.
123    * @throws IOException
124    */
125   public void testReference()
126   throws IOException {
127     Path storedir = new Path(new Path(this.testDir, "regionname"), "familyname");
128     Path dir = new Path(storedir, "1234567890");
129     // Make a store file and write data to it.
130     StoreFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024);
131     writeStoreFile(writer);
132     StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf,
133         StoreFile.BloomType.NONE, false);
134     StoreFile.Reader reader = hsf.createReader();
135     // Split on a row, not in middle of row.  Midkey returned by reader
136     // may be in middle of row.  Create new one with empty column and
137     // timestamp.
138     KeyValue kv = KeyValue.createKeyValueFromKey(reader.midkey());
139     byte [] midRow = kv.getRow();
140     kv = KeyValue.createKeyValueFromKey(reader.getLastKey());
141     byte [] finalRow = kv.getRow();
142     // Make a reference
143     Path refPath = StoreFile.split(fs, dir, hsf, midRow, Range.top);
144     StoreFile refHsf = new StoreFile(this.fs, refPath, true, conf,
145         StoreFile.BloomType.NONE, false);
146     // Now confirm that I can read from the reference and that it only gets
147     // keys from top half of the file.
148     HFileScanner s = refHsf.createReader().getScanner(false, false);
149     for(boolean first = true; (!s.isSeeked() && s.seekTo()) || s.next();) {
150       ByteBuffer bb = s.getKey();
151       kv = KeyValue.createKeyValueFromKey(bb);
152       if (first) {
153         assertTrue(Bytes.equals(kv.getRow(), midRow));
154         first = false;
155       }
156     }
157     assertTrue(Bytes.equals(kv.getRow(), finalRow));
158   }
159 
160   private void checkHalfHFile(final StoreFile f)
161   throws IOException {
162     byte [] midkey = f.createReader().midkey();
163     KeyValue midKV = KeyValue.createKeyValueFromKey(midkey);
164     byte [] midRow = midKV.getRow();
165     // Create top split.
166     Path topDir = Store.getStoreHomedir(this.testDir, "1",
167       Bytes.toBytes(f.getPath().getParent().getName()));
168     if (this.fs.exists(topDir)) {
169       this.fs.delete(topDir, true);
170     }
171     Path topPath = StoreFile.split(this.fs, topDir, f, midRow, Range.top);
172     // Create bottom split.
173     Path bottomDir = Store.getStoreHomedir(this.testDir, "2",
174       Bytes.toBytes(f.getPath().getParent().getName()));
175     if (this.fs.exists(bottomDir)) {
176       this.fs.delete(bottomDir, true);
177     }
178     Path bottomPath = StoreFile.split(this.fs, bottomDir,
179       f, midRow, Range.bottom);
180     // Make readers on top and bottom.
181     StoreFile.Reader top = new StoreFile(this.fs, topPath, true, conf,
182         StoreFile.BloomType.NONE, false).createReader();
183     StoreFile.Reader bottom = new StoreFile(this.fs, bottomPath, true, conf,
184         StoreFile.BloomType.NONE, false).createReader();
185     ByteBuffer previous = null;
186     LOG.info("Midkey: " + midKV.toString());
187     ByteBuffer bbMidkeyBytes = ByteBuffer.wrap(midkey);
188     try {
189       // Now make two HalfMapFiles and assert they can read the full backing
190       // file, one from the top and the other from the bottom.
191       // Test bottom half first.
192       // Now test reading from the top.
193       boolean first = true;
194       ByteBuffer key = null;
195       HFileScanner topScanner = top.getScanner(false, false);
196       while ((!topScanner.isSeeked() && topScanner.seekTo()) ||
197           (topScanner.isSeeked() && topScanner.next())) {
198         key = topScanner.getKey();
199 
200         assertTrue(topScanner.getReader().getComparator().compare(key.array(),
201           key.arrayOffset(), key.limit(), midkey, 0, midkey.length) >= 0);
202         if (first) {
203           first = false;
204           LOG.info("First in top: " + Bytes.toString(Bytes.toBytes(key)));
205         }
206       }
207       LOG.info("Last in top: " + Bytes.toString(Bytes.toBytes(key)));
208 
209       first = true;
210       HFileScanner bottomScanner = bottom.getScanner(false, false);
211       while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
212           bottomScanner.next()) {
213         previous = bottomScanner.getKey();
214         key = bottomScanner.getKey();
215         if (first) {
216           first = false;
217           LOG.info("First in bottom: " +
218             Bytes.toString(Bytes.toBytes(previous)));
219         }
220         assertTrue(key.compareTo(bbMidkeyBytes) < 0);
221       }
222       if (previous != null) {
223         LOG.info("Last in bottom: " + Bytes.toString(Bytes.toBytes(previous)));
224       }
225       // Remove references.
226       this.fs.delete(topPath, false);
227       this.fs.delete(bottomPath, false);
228 
229       // Next test using a midkey that does not exist in the file.
230       // First, do a key that is < than first key. Ensure splits behave
231       // properly.
232       byte [] badmidkey = Bytes.toBytes("  .");
233       topPath = StoreFile.split(this.fs, topDir, f, badmidkey, Range.top);
234       bottomPath = StoreFile.split(this.fs, bottomDir, f, badmidkey,
235         Range.bottom);
236       top = new StoreFile(this.fs, topPath, true, conf,
237           StoreFile.BloomType.NONE, false).createReader();
238       bottom = new StoreFile(this.fs, bottomPath, true, conf,
239           StoreFile.BloomType.NONE, false).createReader();
240       bottomScanner = bottom.getScanner(false, false);
241       int count = 0;
242       while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
243           bottomScanner.next()) {
244         count++;
245       }
246       // When badkey is < than the bottom, should return no values.
247       assertTrue(count == 0);
248       // Now read from the top.
249       first = true;
250       topScanner = top.getScanner(false, false);
251       while ((!topScanner.isSeeked() && topScanner.seekTo()) ||
252           topScanner.next()) {
253         key = topScanner.getKey();
254         assertTrue(topScanner.getReader().getComparator().compare(key.array(),
255           key.arrayOffset(), key.limit(), badmidkey, 0, badmidkey.length) >= 0);
256         if (first) {
257           first = false;
258           KeyValue keyKV = KeyValue.createKeyValueFromKey(key);
259           LOG.info("First top when key < bottom: " + keyKV);
260           String tmp = Bytes.toString(keyKV.getRow());
261           for (int i = 0; i < tmp.length(); i++) {
262             assertTrue(tmp.charAt(i) == 'a');
263           }
264         }
265       }
266       KeyValue keyKV = KeyValue.createKeyValueFromKey(key);
267       LOG.info("Last top when key < bottom: " + keyKV);
268       String tmp = Bytes.toString(keyKV.getRow());
269       for (int i = 0; i < tmp.length(); i++) {
270         assertTrue(tmp.charAt(i) == 'z');
271       }
272       // Remove references.
273       this.fs.delete(topPath, false);
274       this.fs.delete(bottomPath, false);
275 
276       // Test when badkey is > than last key in file ('||' > 'zz').
277       badmidkey = Bytes.toBytes("|||");
278       topPath = StoreFile.split(this.fs, topDir, f, badmidkey, Range.top);
279       bottomPath = StoreFile.split(this.fs, bottomDir, f, badmidkey,
280         Range.bottom);
281       top = new StoreFile(this.fs, topPath, true, conf,
282           StoreFile.BloomType.NONE, false).createReader();
283       bottom = new StoreFile(this.fs, bottomPath, true, conf,
284           StoreFile.BloomType.NONE, false).createReader();
285       first = true;
286       bottomScanner = bottom.getScanner(false, false);
287       while ((!bottomScanner.isSeeked() && bottomScanner.seekTo()) ||
288           bottomScanner.next()) {
289         key = bottomScanner.getKey();
290         if (first) {
291           first = false;
292           keyKV = KeyValue.createKeyValueFromKey(key);
293           LOG.info("First bottom when key > top: " + keyKV);
294           tmp = Bytes.toString(keyKV.getRow());
295           for (int i = 0; i < tmp.length(); i++) {
296             assertTrue(tmp.charAt(i) == 'a');
297           }
298         }
299       }
300       keyKV = KeyValue.createKeyValueFromKey(key);
301       LOG.info("Last bottom when key > top: " + keyKV);
302       for (int i = 0; i < tmp.length(); i++) {
303         assertTrue(Bytes.toString(keyKV.getRow()).charAt(i) == 'z');
304       }
305       count = 0;
306       topScanner = top.getScanner(false, false);
307       while ((!topScanner.isSeeked() && topScanner.seekTo()) ||
308           (topScanner.isSeeked() && topScanner.next())) {
309         count++;
310       }
311       // When badkey is < than the bottom, should return no values.
312       assertTrue(count == 0);
313     } finally {
314       if (top != null) {
315         top.close();
316       }
317       if (bottom != null) {
318         bottom.close();
319       }
320       fs.delete(f.getPath(), true);
321     }
322   }
323 
324   private static String ROOT_DIR =
325     HBaseTestingUtility.getTestDir("TestStoreFile").toString();
326   private static String localFormatter = "%010d";
327   
328   private void bloomWriteRead(StoreFile.Writer writer, FileSystem fs) 
329   throws Exception {
330     float err = conf.getFloat(StoreFile.IO_STOREFILE_BLOOM_ERROR_RATE, 0);
331     Path f = writer.getPath();
332     long now = System.currentTimeMillis();
333     for (int i = 0; i < 2000; i += 2) {
334       String row = String.format(localFormatter, i);
335       KeyValue kv = new KeyValue(row.getBytes(), "family".getBytes(),
336         "col".getBytes(), now, "value".getBytes());
337       writer.append(kv);
338     }
339     writer.close();
340 
341     StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false);
342     reader.loadFileInfo();
343     reader.loadBloomfilter();
344     StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
345 
346     // check false positives rate
347     int falsePos = 0;
348     int falseNeg = 0;
349     for (int i = 0; i < 2000; i++) {
350       String row = String.format(localFormatter, i);
351       TreeSet<byte[]> columns = new TreeSet<byte[]>();
352       columns.add("family:col".getBytes());
353 
354       Scan scan = new Scan(row.getBytes(),row.getBytes());
355       scan.addColumn("family".getBytes(), "family:col".getBytes());
356       boolean exists = scanner.shouldSeek(scan, columns);
357       if (i % 2 == 0) {
358         if (!exists) falseNeg++;
359       } else {
360         if (exists) falsePos++;
361       }
362     }
363     reader.close();
364     fs.delete(f, true);
365     System.out.println("False negatives: " + falseNeg);
366     assertEquals(0, falseNeg);
367     System.out.println("False positives: " + falsePos);
368     if (!(falsePos <= 2* 2000 * err)) {
369       System.out.println("WTFBBQ! " + falsePos + ", " + (2* 2000 * err) );
370     }
371     assertTrue(falsePos <= 2* 2000 * err);    
372   }
373 
374   public void testBloomFilter() throws Exception {
375     FileSystem fs = FileSystem.getLocal(conf);
376     conf.setFloat(StoreFile.IO_STOREFILE_BLOOM_ERROR_RATE, (float)0.01);
377     conf.setBoolean(StoreFile.IO_STOREFILE_BLOOM_ENABLED, true);
378 
379     // write the file
380     Path f = new Path(ROOT_DIR, getName());
381     StoreFile.Writer writer = new StoreFile.Writer(fs, f,
382         StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM,
383         conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000);
384 
385     bloomWriteRead(writer, fs);
386   }
387 
388   public void testBloomTypes() throws Exception {
389     float err = (float) 0.01;
390     FileSystem fs = FileSystem.getLocal(conf);
391     conf.setFloat(StoreFile.IO_STOREFILE_BLOOM_ERROR_RATE, err);
392     conf.setBoolean(StoreFile.IO_STOREFILE_BLOOM_ENABLED, true);
393 
394     int rowCount = 50;
395     int colCount = 10;
396     int versions = 2;
397 
398     // run once using columns and once using rows
399     StoreFile.BloomType[] bt =
400       {StoreFile.BloomType.ROWCOL, StoreFile.BloomType.ROW};
401     int[] expKeys    = {rowCount*colCount, rowCount};
402     // below line deserves commentary.  it is expected bloom false positives
403     //  column = rowCount*2*colCount inserts
404     //  row-level = only rowCount*2 inserts, but failures will be magnified by
405     //              2nd for loop for every column (2*colCount)
406     float[] expErr   = {2*rowCount*colCount*err, 2*rowCount*2*colCount*err};
407 
408     for (int x : new int[]{0,1}) {
409       // write the file
410       Path f = new Path(ROOT_DIR, getName());
411       StoreFile.Writer writer = new StoreFile.Writer(fs, f,
412           StoreFile.DEFAULT_BLOCKSIZE_SMALL,
413           HFile.DEFAULT_COMPRESSION_ALGORITHM,
414           conf, KeyValue.COMPARATOR, bt[x], expKeys[x]);
415 
416       long now = System.currentTimeMillis();
417       for (int i = 0; i < rowCount*2; i += 2) { // rows
418         for (int j = 0; j < colCount*2; j += 2) {   // column qualifiers
419           String row = String.format(localFormatter, i);
420           String col = String.format(localFormatter, j);
421           for (int k= 0; k < versions; ++k) { // versions
422             KeyValue kv = new KeyValue(row.getBytes(),
423               "family".getBytes(), ("col" + col).getBytes(),
424                 now-k, Bytes.toBytes((long)-1));
425             writer.append(kv);
426           }
427         }
428       }
429       writer.close();
430 
431       StoreFile.Reader reader = new StoreFile.Reader(fs, f, null, false);
432       reader.loadFileInfo();
433       reader.loadBloomfilter();
434       StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
435       assertEquals(expKeys[x], reader.bloomFilter.getKeyCount());
436 
437       // check false positives rate
438       int falsePos = 0;
439       int falseNeg = 0;
440       for (int i = 0; i < rowCount*2; ++i) { // rows
441         for (int j = 0; j < colCount*2; ++j) {   // column qualifiers
442           String row = String.format(localFormatter, i);
443           String col = String.format(localFormatter, j);
444           TreeSet<byte[]> columns = new TreeSet<byte[]>();
445           columns.add(("col" + col).getBytes());
446 
447           Scan scan = new Scan(row.getBytes(),row.getBytes());
448           scan.addColumn("family".getBytes(), ("col"+col).getBytes());
449           boolean exists = scanner.shouldSeek(scan, columns);
450           boolean shouldRowExist = i % 2 == 0;
451           boolean shouldColExist = j % 2 == 0;
452           shouldColExist = shouldColExist || bt[x] == StoreFile.BloomType.ROW;
453           if (shouldRowExist && shouldColExist) {
454             if (!exists) falseNeg++;
455           } else {
456             if (exists) falsePos++;
457           }
458         }
459       }
460       reader.close();
461       fs.delete(f, true);
462       System.out.println(bt[x].toString());
463       System.out.println("  False negatives: " + falseNeg);
464       System.out.println("  False positives: " + falsePos);
465       assertEquals(0, falseNeg);
466       assertTrue(falsePos < 2*expErr[x]);
467     }
468   }
469   
470   public void testBloomEdgeCases() throws Exception {
471     float err = (float)0.005;
472     FileSystem fs = FileSystem.getLocal(conf);
473     Path f = new Path(ROOT_DIR, getName());
474     conf.setFloat(StoreFile.IO_STOREFILE_BLOOM_ERROR_RATE, err);
475     conf.setBoolean(StoreFile.IO_STOREFILE_BLOOM_ENABLED, true);
476     conf.setInt(StoreFile.IO_STOREFILE_BLOOM_MAX_KEYS, 1000);
477     
478     // this should not create a bloom because the max keys is too small
479     StoreFile.Writer writer = new StoreFile.Writer(fs, f,
480         StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM,
481         conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 2000);
482     assertFalse(writer.hasBloom());
483     writer.close();
484     fs.delete(f, true);
485     
486     conf.setInt(StoreFile.IO_STOREFILE_BLOOM_MAX_KEYS, Integer.MAX_VALUE);
487 
488     // TODO: commented out because we run out of java heap space on trunk
489     /*
490     // the below config caused IllegalArgumentException in our production cluster
491     // however, the resulting byteSize is < MAX_INT, so this should work properly
492     writer = new StoreFile.Writer(fs, f,
493         StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM,
494         conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, 272446963);
495     assertTrue(writer.hasBloom());
496     bloomWriteRead(writer, fs);
497     */
498     
499     // this, however, is too large and should not create a bloom
500     // because Java can't create a contiguous array > MAX_INT
501     writer = new StoreFile.Writer(fs, f,
502         StoreFile.DEFAULT_BLOCKSIZE_SMALL, HFile.DEFAULT_COMPRESSION_ALGORITHM,
503         conf, KeyValue.COMPARATOR, StoreFile.BloomType.ROW, Integer.MAX_VALUE);
504     assertFalse(writer.hasBloom());
505     writer.close();
506     fs.delete(f, true);
507   }
508   
509   public void testFlushTimeComparator() {
510     assertOrdering(StoreFile.Comparators.FLUSH_TIME,
511         mockStoreFile(true, 1000, -1, "/foo/123"),
512         mockStoreFile(true, 1000, -1, "/foo/126"),
513         mockStoreFile(true, 2000, -1, "/foo/126"),
514         mockStoreFile(false, -1, 1, "/foo/1"),
515         mockStoreFile(false, -1, 3, "/foo/2"),
516         mockStoreFile(false, -1, 5, "/foo/2"),
517         mockStoreFile(false, -1, 5, "/foo/3"));
518   }
519   
520   /**
521    * Assert that the given comparator orders the given storefiles in the
522    * same way that they're passed.
523    */
524   private void assertOrdering(Comparator<StoreFile> comparator, StoreFile ... sfs) {
525     ArrayList<StoreFile> sorted = Lists.newArrayList(sfs);
526     Collections.shuffle(sorted);
527     Collections.sort(sorted, comparator);
528     LOG.debug("sfs: " + Joiner.on(",").join(sfs));
529     LOG.debug("sorted: " + Joiner.on(",").join(sorted));
530     assertTrue(Iterables.elementsEqual(Arrays.asList(sfs), sorted));
531   }
532 
533   /**
534    * Create a mock StoreFile with the given attributes.
535    */
536   private StoreFile mockStoreFile(boolean bulkLoad, long bulkTimestamp,
537       long seqId, String path) {
538     StoreFile mock = Mockito.mock(StoreFile.class);
539     Mockito.doReturn(bulkLoad).when(mock).isBulkLoadResult();
540     Mockito.doReturn(bulkTimestamp).when(mock).getBulkLoadTimestamp();
541     if (bulkLoad) {
542       // Bulk load files will throw if you ask for their sequence ID
543       Mockito.doThrow(new IllegalAccessError("bulk load"))
544         .when(mock).getMaxSequenceId();
545     } else {
546       Mockito.doReturn(seqId).when(mock).getMaxSequenceId();
547     }
548     Mockito.doReturn(new Path(path)).when(mock).getPath();
549     String name = "mock storefile, bulkLoad=" + bulkLoad +
550       " bulkTimestamp=" + bulkTimestamp +
551       " seqId=" + seqId +
552       " path=" + path;
553     Mockito.doReturn(name).when(mock).toString();
554     return mock;
555   }
556 
557   /**
558    *Generate a list of KeyValues for testing based on given parameters
559    * @param timestamps
560    * @param numRows
561    * @param qualifier
562    * @param family
563    * @return
564    */
565   List<KeyValue> getKeyValueSet(long[] timestamps, int numRows,
566       byte[] qualifier, byte[] family) {
567     List<KeyValue> kvList = new ArrayList<KeyValue>();
568     for (int i=1;i<=numRows;i++) {
569       byte[] b = Bytes.toBytes(i) ;
570       LOG.info(Bytes.toString(b));
571       LOG.info(Bytes.toString(b));
572       for (long timestamp: timestamps)
573       {
574         kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
575       }
576     }
577     return kvList;
578   }
579 
580   /**
581    * Test to ensure correctness when using StoreFile with multiple timestamps
582    * @throws IOException
583    */
584   public void testMultipleTimestamps() throws IOException {
585     byte[] family = Bytes.toBytes("familyname");
586     byte[] qualifier = Bytes.toBytes("qualifier");
587     int numRows = 10;
588     long[] timestamps = new long[] {20,10,5,1};
589     Scan scan = new Scan();
590 
591     Path storedir = new Path(new Path(this.testDir, "regionname"),
592     "familyname");
593     Path dir = new Path(storedir, "1234567890");
594     StoreFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024);
595 
596     List<KeyValue> kvList = getKeyValueSet(timestamps,numRows,
597         family, qualifier);
598 
599     for (KeyValue kv : kvList) {
600       writer.append(kv);
601     }
602     writer.appendMetadata(0, false);
603     writer.close();
604 
605     StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf,
606         StoreFile.BloomType.NONE, false);
607     StoreFile.Reader reader = hsf.createReader();
608     StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
609     TreeSet<byte[]> columns = new TreeSet<byte[]>();
610     columns.add(qualifier);
611 
612     scan.setTimeRange(20, 100);
613     assertTrue(scanner.shouldSeek(scan, columns));
614 
615     scan.setTimeRange(1, 2);
616     assertTrue(scanner.shouldSeek(scan, columns));
617 
618     scan.setTimeRange(8, 10);
619     assertTrue(scanner.shouldSeek(scan, columns));
620 
621     scan.setTimeRange(7, 50);
622     assertTrue(scanner.shouldSeek(scan, columns));
623 
624     /*This test is not required for correctness but it should pass when
625      * timestamp range optimization is on*/
626     //scan.setTimeRange(27, 50);
627     //assertTrue(!scanner.shouldSeek(scan, columns));
628   }
629 }