View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.fs.FileSystem;
26  import org.apache.hadoop.fs.Path;
27  import org.apache.hadoop.hbase.KeyValue;
28  import org.apache.hadoop.hbase.KeyValue.KVComparator;
29  import org.apache.hadoop.hbase.client.Scan;
30  import org.apache.hadoop.hbase.io.HalfStoreFileReader;
31  import org.apache.hadoop.hbase.io.Reference;
32  import org.apache.hadoop.hbase.io.hfile.BlockCache;
33  import org.apache.hadoop.hbase.io.hfile.Compression;
34  import org.apache.hadoop.hbase.io.hfile.HFile;
35  import org.apache.hadoop.hbase.io.hfile.HFileScanner;
36  import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
37  import org.apache.hadoop.hbase.util.BloomFilter;
38  import org.apache.hadoop.hbase.util.ByteBloomFilter;
39  import org.apache.hadoop.hbase.util.Bytes;
40  import org.apache.hadoop.hbase.util.Hash;
41  import org.apache.hadoop.hbase.util.Writables;
42  import org.apache.hadoop.io.RawComparator;
43  import org.apache.hadoop.io.WritableUtils;
44  import org.apache.hadoop.util.StringUtils;
45  
46  import com.google.common.base.Function;
47  import com.google.common.collect.ImmutableList;
48  import com.google.common.collect.Ordering;
49  
50  import java.io.FileNotFoundException;
51  import java.io.IOException;
52  import java.lang.management.ManagementFactory;
53  import java.lang.management.MemoryUsage;
54  import java.nio.ByteBuffer;
55  import java.text.NumberFormat;
56  import java.util.Arrays;
57  import java.util.Collections;
58  import java.util.Comparator;
59  import java.util.List;
60  import java.util.Map;
61  import java.util.Random;
62  import java.util.SortedSet;
63  import java.util.concurrent.atomic.AtomicBoolean;
64  import java.util.regex.Matcher;
65  import java.util.regex.Pattern;
66  
67  /**
68   * A Store data file.  Stores usually have one or more of these files.  They
69   * are produced by flushing the memstore to disk.  To
70   * create, call {@link #createWriter(FileSystem, Path, int)} and append data.  Be
71   * sure to add any metadata before calling close on the Writer
72   * (Use the appendMetadata convenience methods). On close, a StoreFile is
73   * sitting in the Filesystem.  To refer to it, create a StoreFile instance
74   * passing filesystem and path.  To read, call {@link #createReader()}.
75   * <p>StoreFiles may also reference store files in another Store.
76   *
77   * The reason for this weird pattern where you use a different instance for the
78   * writer and a reader is that we write once but read a lot more.
79   */
80  public class StoreFile {
81    static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
82  
83    // Config keys.
84    static final String IO_STOREFILE_BLOOM_ERROR_RATE = "io.storefile.bloom.error.rate";
85    static final String IO_STOREFILE_BLOOM_MAX_FOLD = "io.storefile.bloom.max.fold";
86    static final String IO_STOREFILE_BLOOM_MAX_KEYS = "io.storefile.bloom.max.keys";
87    static final String IO_STOREFILE_BLOOM_ENABLED = "io.storefile.bloom.enabled";
88    static final String HFILE_BLOCK_CACHE_SIZE_KEY = "hfile.block.cache.size";
89  
90    public static enum BloomType {
91      /**
92       * Bloomfilters disabled
93       */
94      NONE,
95      /**
96       * Bloom enabled with Table row as Key
97       */
98      ROW,
99      /**
100      * Bloom enabled with Table row & column (family+qualifier) as Key
101      */
102     ROWCOL
103   }
104   // Keys for fileinfo values in HFile
105   /** Max Sequence ID in FileInfo */
106   public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
107   /** Major compaction flag in FileInfo */
108   public static final byte [] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
109   /** Bloom filter Type in FileInfo */
110   static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
111   /** Key for Timerange information in metadata*/
112   static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
113 
114   /** Meta data block name for bloom filter meta-info (ie: bloom params/specs) */
115   static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
116   /** Meta data block name for bloom filter data (ie: bloom bits) */
117   static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
118 
119   // Make default block size for StoreFiles 8k while testing.  TODO: FIX!
120   // Need to make it 8k for testing.
121   public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
122 
123 
124   private static BlockCache hfileBlockCache = null;
125 
126   private final FileSystem fs;
127   // This file's path.
128   private final Path path;
129   // If this storefile references another, this is the reference instance.
130   private Reference reference;
131   // If this StoreFile references another, this is the other files path.
132   private Path referencePath;
133   // Should the block cache be used or not.
134   private boolean blockcache;
135   // Is this from an in-memory store
136   private boolean inMemory;
137 
138   // Keys for metadata stored in backing HFile.
139   // Set when we obtain a Reader.
140   private long sequenceid = -1;
141 
142   // If true, this file was product of a major compaction.  Its then set
143   // whenever you get a Reader.
144   private AtomicBoolean majorCompaction = null;
145 
146   /** Meta key set when store file is a result of a bulk load */
147   public static final byte[] BULKLOAD_TASK_KEY =
148     Bytes.toBytes("BULKLOAD_SOURCE_TASK");
149   public static final byte[] BULKLOAD_TIME_KEY =
150     Bytes.toBytes("BULKLOAD_TIMESTAMP");
151 
152   /**
153    * Map of the metadata entries in the corresponding HFile
154    */
155   private Map<byte[], byte[]> metadataMap;
156 
157   /*
158    * Regex that will work for straight filenames and for reference names.
159    * If reference, then the regex has more than just one group.  Group 1 is
160    * this files id.  Group 2 the referenced region name, etc.
161    */
162   private static final Pattern REF_NAME_PARSER =
163     Pattern.compile("^(\\d+)(?:\\.(.+))?$");
164 
165   // StoreFile.Reader
166   private volatile Reader reader;
167 
168   // Used making file ids.
169   private final static Random rand = new Random();
170   private final Configuration conf;
171   private final BloomType bloomType;
172 
173 
174   /**
175    * Constructor, loads a reader and it's indices, etc. May allocate a
176    * substantial amount of ram depending on the underlying files (10-20MB?).
177    *
178    * @param fs  The current file system to use.
179    * @param p  The path of the file.
180    * @param blockcache  <code>true</code> if the block cache is enabled.
181    * @param conf  The current configuration.
182    * @param bt The bloom type to use for this store file
183    * @throws IOException When opening the reader fails.
184    */
185   StoreFile(final FileSystem fs,
186             final Path p,
187             final boolean blockcache,
188             final Configuration conf,
189             final BloomType bt,
190             final boolean inMemory)
191       throws IOException {
192     this.conf = conf;
193     this.fs = fs;
194     this.path = p;
195     this.blockcache = blockcache;
196     this.inMemory = inMemory;
197     if (isReference(p)) {
198       this.reference = Reference.read(fs, p);
199       this.referencePath = getReferredToFile(this.path);
200     }
201     // ignore if the column family config says "no bloom filter"
202     // even if there is one in the hfile.
203     if (conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true)) {
204       this.bloomType = bt;
205     } else {
206       this.bloomType = BloomType.NONE;
207       LOG.info("Ignoring bloom filter check for file (disabled in config)");
208     }
209   }
210 
211   /**
212    * @return Path or null if this StoreFile was made with a Stream.
213    */
214   Path getPath() {
215     return this.path;
216   }
217 
218   /**
219    * @return The Store/ColumnFamily this file belongs to.
220    */
221   byte [] getFamily() {
222     return Bytes.toBytes(this.path.getParent().getName());
223   }
224 
225   /**
226    * @return True if this is a StoreFile Reference; call after {@link #open()}
227    * else may get wrong answer.
228    */
229   boolean isReference() {
230     return this.reference != null;
231   }
232 
233   /**
234    * @param p Path to check.
235    * @return True if the path has format of a HStoreFile reference.
236    */
237   public static boolean isReference(final Path p) {
238     return !p.getName().startsWith("_") &&
239       isReference(p, REF_NAME_PARSER.matcher(p.getName()));
240   }
241 
242   /**
243    * @param p Path to check.
244    * @param m Matcher to use.
245    * @return True if the path has format of a HStoreFile reference.
246    */
247   public static boolean isReference(final Path p, final Matcher m) {
248     if (m == null || !m.matches()) {
249       LOG.warn("Failed match of store file name " + p.toString());
250       throw new RuntimeException("Failed match of store file name " +
251           p.toString());
252     }
253     return m.groupCount() > 1 && m.group(2) != null;
254   }
255 
256   /*
257    * Return path to the file referred to by a Reference.  Presumes a directory
258    * hierarchy of <code>${hbase.rootdir}/tablename/regionname/familyname</code>.
259    * @param p Path to a Reference file.
260    * @return Calculated path to parent region file.
261    * @throws IOException
262    */
263   static Path getReferredToFile(final Path p) {
264     Matcher m = REF_NAME_PARSER.matcher(p.getName());
265     if (m == null || !m.matches()) {
266       LOG.warn("Failed match of store file name " + p.toString());
267       throw new RuntimeException("Failed match of store file name " +
268           p.toString());
269     }
270     // Other region name is suffix on the passed Reference file name
271     String otherRegion = m.group(2);
272     // Tabledir is up two directories from where Reference was written.
273     Path tableDir = p.getParent().getParent().getParent();
274     String nameStrippedOfSuffix = m.group(1);
275     // Build up new path with the referenced region in place of our current
276     // region in the reference path.  Also strip regionname suffix from name.
277     return new Path(new Path(new Path(tableDir, otherRegion),
278       p.getParent().getName()), nameStrippedOfSuffix);
279   }
280 
281   /**
282    * @return True if this file was made by a major compaction.
283    */
284   boolean isMajorCompaction() {
285     if (this.majorCompaction == null) {
286       throw new NullPointerException("This has not been set yet");
287     }
288     return this.majorCompaction.get();
289   }
290 
291   /**
292    * @return This files maximum edit sequence id.
293    */
294   public long getMaxSequenceId() {
295     return this.sequenceid;
296   }
297 
298   /**
299    * Return the highest sequence ID found across all storefiles in
300    * the given list. Store files that were created by a mapreduce
301    * bulk load are ignored, as they do not correspond to any edit
302    * log items.
303    * @return 0 if no non-bulk-load files are provided or, this is Store that
304    * does not yet have any store files.
305    */
306   public static long getMaxSequenceIdInList(List<StoreFile> sfs) {
307     long max = 0;
308     for (StoreFile sf : sfs) {
309       if (!sf.isBulkLoadResult()) {
310         max = Math.max(max, sf.getMaxSequenceId());
311       }
312     }
313     return max;
314   }
315 
316   /**
317    * @return true if this storefile was created by HFileOutputFormat
318    * for a bulk load.
319    */
320   boolean isBulkLoadResult() {
321     return metadataMap.containsKey(BULKLOAD_TIME_KEY);
322   }
323 
324   /**
325    * Return the timestamp at which this bulk load file was generated.
326    */
327   public long getBulkLoadTimestamp() {
328     return Bytes.toLong(metadataMap.get(BULKLOAD_TIME_KEY));
329   }
330 
331   /**
332    * Returns the block cache or <code>null</code> in case none should be used.
333    *
334    * @param conf  The current configuration.
335    * @return The block cache or <code>null</code>.
336    */
337   public static synchronized BlockCache getBlockCache(Configuration conf) {
338     if (hfileBlockCache != null) return hfileBlockCache;
339 
340     float cachePercentage = conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.2f);
341     // There should be a better way to optimize this. But oh well.
342     if (cachePercentage == 0L) return null;
343     if (cachePercentage > 1.0) {
344       throw new IllegalArgumentException(HFILE_BLOCK_CACHE_SIZE_KEY +
345         " must be between 0.0 and 1.0, not > 1.0");
346     }
347 
348     // Calculate the amount of heap to give the heap.
349     MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
350     long cacheSize = (long)(mu.getMax() * cachePercentage);
351     LOG.info("Allocating LruBlockCache with maximum size " +
352       StringUtils.humanReadableInt(cacheSize));
353     hfileBlockCache = new LruBlockCache(cacheSize, DEFAULT_BLOCKSIZE_SMALL);
354     return hfileBlockCache;
355   }
356 
357   /**
358    * @return the blockcache
359    */
360   public BlockCache getBlockCache() {
361     return blockcache ? getBlockCache(conf) : null;
362   }
363 
364   /**
365    * Opens reader on this store file.  Called by Constructor.
366    * @return Reader for the store file.
367    * @throws IOException
368    * @see #closeReader()
369    */
370   private Reader open() throws IOException {
371     if (this.reader != null) {
372       throw new IllegalAccessError("Already open");
373     }
374     if (isReference()) {
375       this.reader = new HalfStoreFileReader(this.fs, this.referencePath,
376           getBlockCache(), this.reference);
377     } else {
378       this.reader = new Reader(this.fs, this.path, getBlockCache(),
379           this.inMemory);
380     }
381     // Load up indices and fileinfo.
382     metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
383     // Read in our metadata.
384     byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
385     if (b != null) {
386       // By convention, if halfhfile, top half has a sequence number > bottom
387       // half. Thats why we add one in below. Its done for case the two halves
388       // are ever merged back together --rare.  Without it, on open of store,
389       // since store files are distingushed by sequence id, the one half would
390       // subsume the other.
391       this.sequenceid = Bytes.toLong(b);
392       if (isReference()) {
393         if (Reference.isTopFileRegion(this.reference.getFileRegion())) {
394           this.sequenceid += 1;
395         }
396       }
397     }
398     this.reader.setSequenceID(this.sequenceid);
399 
400     b = metadataMap.get(MAJOR_COMPACTION_KEY);
401     if (b != null) {
402       boolean mc = Bytes.toBoolean(b);
403       if (this.majorCompaction == null) {
404         this.majorCompaction = new AtomicBoolean(mc);
405       } else {
406         this.majorCompaction.set(mc);
407       }
408     } else {
409       // Presume it is not major compacted if it doesn't explicity say so
410       // HFileOutputFormat explicitly sets the major compacted key.
411       this.majorCompaction = new AtomicBoolean(false);
412     }
413 
414     if (this.bloomType != BloomType.NONE) {
415       this.reader.loadBloomfilter();
416     }
417 
418     try {
419       byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
420       if (timerangeBytes != null) {
421         this.reader.timeRangeTracker = new TimeRangeTracker();
422         Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
423       }
424     } catch (IllegalArgumentException e) {
425       LOG.error("Error reading timestamp range data from meta -- " +
426           "proceeding without", e);
427       this.reader.timeRangeTracker = null;
428     }
429     return this.reader;
430   }
431 
432   /**
433    * @return Reader for StoreFile. creates if necessary
434    * @throws IOException
435    */
436   public Reader createReader() throws IOException {
437     if (this.reader == null) {
438       this.reader = open();
439     }
440     return this.reader;
441   }
442 
443   /**
444    * @return Current reader.  Must call createReader first else returns null.
445    * @throws IOException
446    * @see #createReader()
447    */
448   public Reader getReader() {
449     return this.reader;
450   }
451 
452   /**
453    * @throws IOException
454    */
455   public synchronized void closeReader() throws IOException {
456     if (this.reader != null) {
457       this.reader.close();
458       this.reader = null;
459     }
460   }
461 
462   /**
463    * Delete this file
464    * @throws IOException
465    */
466   public void deleteReader() throws IOException {
467     closeReader();
468     this.fs.delete(getPath(), true);
469   }
470 
471   @Override
472   public String toString() {
473     return this.path.toString() +
474       (isReference()? "-" + this.referencePath + "-" + reference.toString(): "");
475   }
476 
477   /**
478    * @return a length description of this StoreFile, suitable for debug output
479    */
480   public String toStringDetailed() {
481     StringBuilder sb = new StringBuilder();
482     sb.append(this.path.toString());
483     sb.append(", isReference=").append(isReference());
484     sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
485     if (isBulkLoadResult()) {
486       sb.append(", bulkLoadTS=").append(getBulkLoadTimestamp());
487     } else {
488       sb.append(", seqid=").append(getMaxSequenceId());
489     }
490     sb.append(", majorCompaction=").append(isMajorCompaction());
491 
492     return sb.toString();
493   }
494 
495   /**
496    * Utility to help with rename.
497    * @param fs
498    * @param src
499    * @param tgt
500    * @return True if succeeded.
501    * @throws IOException
502    */
503   public static Path rename(final FileSystem fs,
504                             final Path src,
505                             final Path tgt)
506       throws IOException {
507 
508     if (!fs.exists(src)) {
509       throw new FileNotFoundException(src.toString());
510     }
511     if (!fs.rename(src, tgt)) {
512       throw new IOException("Failed rename of " + src + " to " + tgt);
513     }
514     return tgt;
515   }
516 
517   /**
518    * Get a store file writer. Client is responsible for closing file when done.
519    *
520    * @param fs
521    * @param dir Path to family directory.  Makes the directory if doesn't exist.
522    * Creates a file with a unique name in this directory.
523    * @param blocksize size per filesystem block
524    * @return StoreFile.Writer
525    * @throws IOException
526    */
527   public static Writer createWriter(final FileSystem fs,
528                                               final Path dir,
529                                               final int blocksize)
530       throws IOException {
531 
532     return createWriter(fs, dir, blocksize, null, null, null, BloomType.NONE, 0);
533   }
534 
535   /**
536    * Create a store file writer. Client is responsible for closing file when done.
537    * If metadata, add BEFORE closing using appendMetadata()
538    * @param fs
539    * @param dir Path to family directory.  Makes the directory if doesn't exist.
540    * Creates a file with a unique name in this directory.
541    * @param blocksize
542    * @param algorithm Pass null to get default.
543    * @param conf HBase system configuration. used with bloom filters
544    * @param bloomType column family setting for bloom filters
545    * @param c Pass null to get default.
546    * @param maxKeySize peak theoretical entry size (maintains error rate)
547    * @return HFile.Writer
548    * @throws IOException
549    */
550   public static StoreFile.Writer createWriter(final FileSystem fs,
551                                               final Path dir,
552                                               final int blocksize,
553                                               final Compression.Algorithm algorithm,
554                                               final KeyValue.KVComparator c,
555                                               final Configuration conf,
556                                               BloomType bloomType,
557                                               int maxKeySize)
558       throws IOException {
559 
560     if (!fs.exists(dir)) {
561       fs.mkdirs(dir);
562     }
563     Path path = getUniqueFile(fs, dir);
564     if(conf == null || !conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true)) {
565       bloomType = BloomType.NONE;
566     }
567 
568     return new Writer(fs, path, blocksize,
569         algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm,
570         conf, c == null? KeyValue.COMPARATOR: c, bloomType, maxKeySize);
571   }
572 
573   /**
574    * @param fs
575    * @param dir Directory to create file in.
576    * @return random filename inside passed <code>dir</code>
577    */
578   public static Path getUniqueFile(final FileSystem fs, final Path dir)
579       throws IOException {
580     if (!fs.getFileStatus(dir).isDir()) {
581       throw new IOException("Expecting " + dir.toString() +
582         " to be a directory");
583     }
584     return fs.getFileStatus(dir).isDir()? getRandomFilename(fs, dir): dir;
585   }
586 
587   /**
588    *
589    * @param fs
590    * @param dir
591    * @return Path to a file that doesn't exist at time of this invocation.
592    * @throws IOException
593    */
594   static Path getRandomFilename(final FileSystem fs, final Path dir)
595       throws IOException {
596     return getRandomFilename(fs, dir, null);
597   }
598 
599   /**
600    *
601    * @param fs
602    * @param dir
603    * @param suffix
604    * @return Path to a file that doesn't exist at time of this invocation.
605    * @throws IOException
606    */
607   static Path getRandomFilename(final FileSystem fs,
608                                 final Path dir,
609                                 final String suffix)
610       throws IOException {
611     long id = -1;
612     Path p = null;
613     do {
614       id = Math.abs(rand.nextLong());
615       p = new Path(dir, Long.toString(id) +
616         ((suffix == null || suffix.length() <= 0)? "": suffix));
617     } while(fs.exists(p));
618     return p;
619   }
620 
621   /**
622    * Write out a split reference.
623    *
624    * Package local so it doesnt leak out of regionserver.
625    *
626    * @param fs
627    * @param splitDir Presumes path format is actually
628    * <code>SOME_DIRECTORY/REGIONNAME/FAMILY</code>.
629    * @param f File to split.
630    * @param splitRow
631    * @param range
632    * @return Path to created reference.
633    * @throws IOException
634    */
635   static Path split(final FileSystem fs,
636                     final Path splitDir,
637                     final StoreFile f,
638                     final byte [] splitRow,
639                     final Reference.Range range)
640       throws IOException {
641     // A reference to the bottom half of the hsf store file.
642     Reference r = new Reference(splitRow, range);
643     // Add the referred-to regions name as a dot separated suffix.
644     // See REF_NAME_PARSER regex above.  The referred-to regions name is
645     // up in the path of the passed in <code>f</code> -- parentdir is family,
646     // then the directory above is the region name.
647     String parentRegionName = f.getPath().getParent().getParent().getName();
648     // Write reference with same file id only with the other region name as
649     // suffix and into the new region location (under same family).
650     Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
651     return r.write(fs, p);
652   }
653 
654 
655   /**
656    * A StoreFile writer.  Use this to read/write HBase Store Files. It is package
657    * local because it is an implementation detail of the HBase regionserver.
658    */
659   public static class Writer {
660     private final BloomFilter bloomFilter;
661     private final BloomType bloomType;
662     private KVComparator kvComparator;
663     private KeyValue lastKv = null;
664     private byte[] lastByteArray = null;
665     TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
666     /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
667      * When flushing a memstore, we set TimeRange and use this variable to
668      * indicate that it doesn't need to be calculated again while
669      * appending KeyValues.
670      * It is not set in cases of compactions when it is recalculated using only
671      * the appended KeyValues*/
672     boolean isTimeRangeTrackerSet = false;
673 
674     protected HFile.Writer writer;
675     /**
676      * Creates an HFile.Writer that also write helpful meta data.
677      * @param fs file system to write to
678      * @param path file name to create
679      * @param blocksize HDFS block size
680      * @param compress HDFS block compression
681      * @param conf user configuration
682      * @param comparator key comparator
683      * @param bloomType bloom filter setting
684      * @param maxKeys maximum amount of keys to add (for blooms)
685      * @throws IOException problem writing to FS
686      */
687     public Writer(FileSystem fs, Path path, int blocksize,
688         Compression.Algorithm compress, final Configuration conf,
689         final KVComparator comparator, BloomType bloomType, int maxKeys)
690         throws IOException {
691       writer = new HFile.Writer(fs, path, blocksize, compress, comparator.getRawComparator());
692 
693       this.kvComparator = comparator;
694 
695       BloomFilter bloom = null;
696       BloomType bt = BloomType.NONE;
697 
698       if (bloomType != BloomType.NONE && conf != null) {
699         float err = conf.getFloat(IO_STOREFILE_BLOOM_ERROR_RATE, (float)0.01);
700         // Since in row+col blooms we have 2 calls to shouldSeek() instead of 1
701         // and the false positives are adding up, we should keep the error rate
702         // twice as low in order to maintain the number of false positives as
703         // desired by the user
704         if (bloomType == BloomType.ROWCOL) {
705           err /= 2;
706         }
707         int maxFold = conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD, 7);
708         int tooBig = conf.getInt(IO_STOREFILE_BLOOM_MAX_KEYS, 128*1000*1000);
709         
710         if (maxKeys < tooBig) { 
711           try {
712             bloom = new ByteBloomFilter(maxKeys, err,
713                 Hash.getHashType(conf), maxFold);
714             bloom.allocBloom();
715             bt = bloomType;
716           } catch (IllegalArgumentException iae) {
717             LOG.warn(String.format(
718               "Parse error while creating bloom for %s (%d, %f)", 
719               path, maxKeys, err), iae);
720             bloom = null;
721             bt = BloomType.NONE;
722           }
723         } else {
724           if (LOG.isDebugEnabled()) {
725             LOG.debug("Skipping bloom filter because max keysize too large: " 
726                 + maxKeys);
727           }
728         }
729       }
730 
731       this.bloomFilter = bloom;
732       this.bloomType = bt;
733     }
734 
735     /**
736      * Writes meta data.
737      * Call before {@link #close()} since its written as meta data to this file.
738      * @param maxSequenceId Maximum sequence id.
739      * @param majorCompaction True if this file is product of a major compaction
740      * @throws IOException problem writing to FS
741      */
742     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
743     throws IOException {
744       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
745       writer.appendFileInfo(MAJOR_COMPACTION_KEY,
746           Bytes.toBytes(majorCompaction));
747       appendTimeRangeMetadata();
748     }
749 
750     /**
751      * Add TimestampRange to Metadata
752      */
753     public void appendTimeRangeMetadata() throws IOException {
754       appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
755     }
756 
757     /**
758      * Set TimeRangeTracker
759      * @param trt
760      */
761     public void setTimeRangeTracker(final TimeRangeTracker trt) {
762       this.timeRangeTracker = trt;
763       isTimeRangeTrackerSet = true;
764     }
765 
766     /**
767      * If the timeRangeTracker is not set,
768      * update TimeRangeTracker to include the timestamp of this key
769      * @param kv
770      * @throws IOException
771      */
772     public void includeInTimeRangeTracker(final KeyValue kv) {
773       if (!isTimeRangeTrackerSet) {
774         timeRangeTracker.includeTimestamp(kv);
775       }
776     }
777 
778     /**
779      * If the timeRangeTracker is not set,
780      * update TimeRangeTracker to include the timestamp of this key
781      * @param key
782      * @throws IOException
783      */
784     public void includeInTimeRangeTracker(final byte [] key) {
785       if (!isTimeRangeTrackerSet) {
786         timeRangeTracker.includeTimestamp(key);
787       }
788     }
789 
790     public void append(final KeyValue kv) throws IOException {
791       if (this.bloomFilter != null) {
792         // only add to the bloom filter on a new, unique key
793         boolean newKey = true;
794         if (this.lastKv != null) {
795           switch(bloomType) {
796           case ROW:
797             newKey = ! kvComparator.matchingRows(kv, lastKv);
798             break;
799           case ROWCOL:
800             newKey = ! kvComparator.matchingRowColumn(kv, lastKv);
801             break;
802           case NONE:
803             newKey = false;
804           }
805         }
806         if (newKey) {
807           /*
808            * http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
809            * Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp
810            *
811            * 2 Types of Filtering:
812            *  1. Row = Row
813            *  2. RowCol = Row + Qualifier
814            */
815           switch (bloomType) {
816           case ROW:
817             this.bloomFilter.add(kv.getBuffer(), kv.getRowOffset(),
818                 kv.getRowLength());
819             break;
820           case ROWCOL:
821             // merge(row, qualifier)
822             int ro = kv.getRowOffset();
823             int rl = kv.getRowLength();
824             int qo = kv.getQualifierOffset();
825             int ql = kv.getQualifierLength();
826             byte [] result = new byte[rl + ql];
827             System.arraycopy(kv.getBuffer(), ro, result, 0,  rl);
828             System.arraycopy(kv.getBuffer(), qo, result, rl, ql);
829             this.bloomFilter.add(result);
830             break;
831           default:
832           }
833           this.lastKv = kv;
834         }
835       }
836       writer.append(kv);
837       includeInTimeRangeTracker(kv);
838     }
839 
840     public Path getPath() {
841       return this.writer.getPath();
842     }
843     
844     boolean hasBloom() { 
845       return this.bloomFilter != null;
846     }
847 
848     public void append(final byte [] key, final byte [] value) throws IOException {
849       if (this.bloomFilter != null) {
850         // only add to the bloom filter on a new row
851         if (this.lastByteArray == null || !Arrays.equals(key, lastByteArray)) {
852           this.bloomFilter.add(key);
853           this.lastByteArray = key;
854         }
855       }
856       writer.append(key, value);
857       includeInTimeRangeTracker(key);
858     }
859 
860     public void close() throws IOException {
861       // make sure we wrote something to the bloom before adding it
862       if (this.bloomFilter != null && this.bloomFilter.getKeyCount() > 0) {
863         bloomFilter.compactBloom();
864         if (this.bloomFilter.getMaxKeys() > 0) {
865           int b = this.bloomFilter.getByteSize();
866           int k = this.bloomFilter.getKeyCount();
867           int m = this.bloomFilter.getMaxKeys();
868           StoreFile.LOG.info("Bloom added to HFile (" + 
869               getPath() + "): " + StringUtils.humanReadableInt(b) + ", " +
870               k + "/" + m + " (" + NumberFormat.getPercentInstance().format(
871                 ((double)k) / ((double)m)) + ")");
872         }
873         writer.appendMetaBlock(BLOOM_FILTER_META_KEY, bloomFilter.getMetaWriter());
874         writer.appendMetaBlock(BLOOM_FILTER_DATA_KEY, bloomFilter.getDataWriter());
875         writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
876       }
877       writer.close();
878     }
879 
880     public void appendFileInfo(byte[] key, byte[] value) throws IOException {
881       writer.appendFileInfo(key, value);
882     }
883   }
884 
885   /**
886    * Reader for a StoreFile.
887    */
888   public static class Reader {
889     static final Log LOG = LogFactory.getLog(Reader.class.getName());
890 
891     protected BloomFilter bloomFilter = null;
892     protected BloomType bloomFilterType;
893     private final HFile.Reader reader;
894     protected TimeRangeTracker timeRangeTracker = null;
895     protected long sequenceID = -1;
896 
897     public Reader(FileSystem fs, Path path, BlockCache blockCache, boolean inMemory)
898         throws IOException {
899       reader = new HFile.Reader(fs, path, blockCache, inMemory);
900       bloomFilterType = BloomType.NONE;
901     }
902 
903     public RawComparator<byte []> getComparator() {
904       return reader.getComparator();
905     }
906 
907     /**
908      * Get a scanner to scan over this StoreFile.
909      *
910      * @param cacheBlocks should this scanner cache blocks?
911      * @param pread use pread (for highly concurrent small readers)
912      * @return a scanner
913      */
914     public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) {
915       return new StoreFileScanner(this, getScanner(cacheBlocks, pread));
916     }
917 
918     /**
919      * Warning: Do not write further code which depends on this call. Instead
920      * use getStoreFileScanner() which uses the StoreFileScanner class/interface
921      * which is the preferred way to scan a store with higher level concepts.
922      *
923      * @param cacheBlocks should we cache the blocks?
924      * @param pread use pread (for concurrent small readers)
925      * @return the underlying HFileScanner
926      */
927     @Deprecated
928     public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
929       return reader.getScanner(cacheBlocks, pread);
930     }
931 
932     public void close() throws IOException {
933       reader.close();
934     }
935 
936     public boolean shouldSeek(Scan scan, final SortedSet<byte[]> columns) {
937         return (passesTimerangeFilter(scan) && passesBloomFilter(scan,columns));
938     }
939 
940     /**
941      * Check if this storeFile may contain keys within the TimeRange
942      * @param scan
943      * @return False if it definitely does not exist in this StoreFile
944      */
945     private boolean passesTimerangeFilter(Scan scan) {
946       if (timeRangeTracker == null) {
947         return true;
948       } else {
949         return timeRangeTracker.includesTimeRange(scan.getTimeRange());
950       }
951     }
952 
953     private boolean passesBloomFilter(Scan scan, final SortedSet<byte[]> columns) {
954       BloomFilter bm = this.bloomFilter;
955       if (bm == null || !scan.isGetScan()) {
956         return true;
957       }
958       byte[] row = scan.getStartRow();
959       byte[] key;
960       switch (this.bloomFilterType) {
961         case ROW:
962           key = row;
963           break;
964         case ROWCOL:
965           if (columns != null && columns.size() == 1) {
966             byte[] col = columns.first();
967             key = Bytes.add(row, col);
968             break;
969           }
970           //$FALL-THROUGH$
971         default:
972           return true;
973       }
974 
975       try {
976         ByteBuffer bloom = reader.getMetaBlock(BLOOM_FILTER_DATA_KEY, true);
977         if (bloom != null) {
978           if (this.bloomFilterType == BloomType.ROWCOL) {
979             // Since a Row Delete is essentially a DeleteFamily applied to all
980             // columns, a file might be skipped if using row+col Bloom filter.
981             // In order to ensure this file is included an additional check is
982             // required looking only for a row bloom.
983             return bm.contains(key, bloom) ||
984                 bm.contains(row, bloom);
985           }
986           else {
987             return bm.contains(key, bloom);
988           }
989         }
990       } catch (IOException e) {
991         LOG.error("Error reading bloom filter data -- proceeding without",
992             e);
993         setBloomFilterFaulty();
994       } catch (IllegalArgumentException e) {
995         LOG.error("Bad bloom filter data -- proceeding without", e);
996         setBloomFilterFaulty();
997       }
998 
999       return true;
1000     }
1001 
1002     public Map<byte[], byte[]> loadFileInfo() throws IOException {
1003       Map<byte [], byte []> fi = reader.loadFileInfo();
1004 
1005       byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
1006       if (b != null) {
1007         bloomFilterType = BloomType.valueOf(Bytes.toString(b));
1008       }
1009 
1010       return fi;
1011     }
1012 
1013     public void loadBloomfilter() {
1014       if (this.bloomFilter != null) {
1015         return; // already loaded
1016       }
1017 
1018       try {
1019         ByteBuffer b = reader.getMetaBlock(BLOOM_FILTER_META_KEY, false);
1020         if (b != null) {
1021           if (bloomFilterType == BloomType.NONE) {
1022             throw new IOException("valid bloom filter type not found in FileInfo");
1023           }
1024 
1025 
1026           this.bloomFilter = new ByteBloomFilter(b);
1027           LOG.info("Loaded " + (bloomFilterType== BloomType.ROW? "row":"col")
1028                  + " bloom filter metadata for " + reader.getName());
1029         }
1030       } catch (IOException e) {
1031         LOG.error("Error reading bloom filter meta -- proceeding without", e);
1032         this.bloomFilter = null;
1033       } catch (IllegalArgumentException e) {
1034         LOG.error("Bad bloom filter meta -- proceeding without", e);
1035         this.bloomFilter = null;
1036       }
1037     }
1038 
1039     public int getFilterEntries() {
1040       return (this.bloomFilter != null) ? this.bloomFilter.getKeyCount()
1041           : reader.getFilterEntries();
1042     }
1043 
1044     public ByteBuffer getMetaBlock(String bloomFilterDataKey, boolean cacheBlock) throws IOException {
1045       return reader.getMetaBlock(bloomFilterDataKey, cacheBlock);
1046     }
1047 
1048     public void setBloomFilterFaulty() {
1049       bloomFilter = null;
1050     }
1051 
1052     public byte[] getLastKey() {
1053       return reader.getLastKey();
1054     }
1055 
1056     public byte[] midkey() throws IOException {
1057       return reader.midkey();
1058     }
1059 
1060     public long length() {
1061       return reader.length();
1062     }
1063 
1064     public int getEntries() {
1065       return reader.getEntries();
1066     }
1067 
1068     public byte[] getFirstKey() {
1069       return reader.getFirstKey();
1070     }
1071 
1072     public long indexSize() {
1073       return reader.indexSize();
1074     }
1075 
1076     public BloomType getBloomFilterType() {
1077       return this.bloomFilterType;
1078     }
1079 
1080     public long getSequenceID() {
1081       return sequenceID;
1082     }
1083 
1084     public void setSequenceID(long sequenceID) {
1085       this.sequenceID = sequenceID;
1086     }
1087   }
1088 
1089   /**
1090    * Useful comparators for comparing StoreFiles.
1091    */
1092   abstract static class Comparators {
1093     /**
1094      * Comparator that compares based on the flush time of
1095      * the StoreFiles. All bulk loads are placed before all non-
1096      * bulk loads, and then all files are sorted by sequence ID.
1097      * If there are ties, the path name is used as a tie-breaker.
1098      */
1099     static final Comparator<StoreFile> FLUSH_TIME =
1100       Ordering.compound(ImmutableList.of(
1101           Ordering.natural().onResultOf(new GetBulkTime()),
1102           Ordering.natural().onResultOf(new GetSeqId()),
1103           Ordering.natural().onResultOf(new GetPathName())
1104       ));
1105 
1106     private static class GetBulkTime implements Function<StoreFile, Long> {
1107       @Override
1108       public Long apply(StoreFile sf) {
1109         if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
1110         return sf.getBulkLoadTimestamp();
1111       }
1112     }
1113     private static class GetSeqId implements Function<StoreFile, Long> {
1114       @Override
1115       public Long apply(StoreFile sf) {
1116         if (sf.isBulkLoadResult()) return -1L;
1117         return sf.getMaxSequenceId();
1118       }
1119     }
1120     private static class GetPathName implements Function<StoreFile, String> {
1121       @Override
1122       public String apply(StoreFile sf) {
1123         return sf.getPath().getName();
1124       }
1125     }
1126 
1127   }
1128 }