View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements. See the NOTICE file distributed with this
4    * work for additional information regarding copyright ownership. The ASF
5    * licenses this file to you under the Apache License, Version 2.0 (the
6    * "License"); you may not use this file except in compliance with the License.
7    * You may obtain a copy of the License at
8    *
9    * http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14   * License for the specific language governing permissions and limitations under
15   * the License.
16   */
17  package org.apache.hadoop.hbase.io.hfile;
18  
19  import java.io.BufferedInputStream;
20  import java.io.BufferedOutputStream;
21  import java.io.FilterOutputStream;
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configurable;
29  import org.apache.hadoop.conf.Configuration;
30  import org.apache.hadoop.io.compress.CodecPool;
31  import org.apache.hadoop.io.compress.CompressionCodec;
32  import org.apache.hadoop.io.compress.CompressionInputStream;
33  import org.apache.hadoop.io.compress.CompressionOutputStream;
34  import org.apache.hadoop.io.compress.Compressor;
35  import org.apache.hadoop.io.compress.Decompressor;
36  import org.apache.hadoop.io.compress.GzipCodec;
37  import org.apache.hadoop.io.compress.DefaultCodec;
38  import org.apache.hadoop.util.ReflectionUtils;
39  
40  /**
41   * Compression related stuff.
42   * Copied from hadoop-3315 tfile.
43   */
44  public final class Compression {
45    static final Log LOG = LogFactory.getLog(Compression.class);
46  
47    /**
48     * Prevent the instantiation of class.
49     */
50    private Compression() {
51      super();
52    }
53  
54    static class FinishOnFlushCompressionStream extends FilterOutputStream {
55      public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
56        super(cout);
57      }
58  
59      @Override
60      public void write(byte b[], int off, int len) throws IOException {
61        out.write(b, off, len);
62      }
63  
64      @Override
65      public void flush() throws IOException {
66        CompressionOutputStream cout = (CompressionOutputStream) out;
67        cout.finish();
68        cout.flush();
69        cout.resetState();
70      }
71    }
72  
73    /**
74     * Compression algorithms. The ordinal of these cannot change or else you
75     * risk breaking all existing HFiles out there.  Even the ones that are
76     * not compressed! (They use the NONE algorithm)
77     */
78    public static enum Algorithm {
79      LZO("lzo") {
80        // Use base type to avoid compile-time dependencies.
81        private transient CompressionCodec lzoCodec;
82  
83        @Override
84        CompressionCodec getCodec(Configuration conf) {
85          if (lzoCodec == null) {
86            try {
87              Class<?> externalCodec =
88                  ClassLoader.getSystemClassLoader().loadClass("com.hadoop.compression.lzo.LzoCodec");
89              lzoCodec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec, 
90                  new Configuration(conf));
91            } catch (ClassNotFoundException e) {
92              throw new RuntimeException(e);
93            }
94          }
95          return lzoCodec;
96        }
97      },
98      GZ("gz") {
99        private transient GzipCodec codec;
100 
101       @Override
102       DefaultCodec getCodec(Configuration conf) {
103         if (codec == null) {
104           codec = new GzipCodec();
105           codec.setConf(new Configuration(conf));
106         }
107 
108         return codec;
109       }
110     },
111 
112     NONE("none") {
113       @Override
114       DefaultCodec getCodec(Configuration conf) {
115         return null;
116       }
117 
118       @Override
119       public synchronized InputStream createDecompressionStream(
120           InputStream downStream, Decompressor decompressor,
121           int downStreamBufferSize) throws IOException {
122         if (downStreamBufferSize > 0) {
123           return new BufferedInputStream(downStream, downStreamBufferSize);
124         }
125         // else {
126           // Make sure we bypass FSInputChecker buffer.
127         // return new BufferedInputStream(downStream, 1024);
128         // }
129         // }
130         return downStream;
131       }
132 
133       @Override
134       public synchronized OutputStream createCompressionStream(
135           OutputStream downStream, Compressor compressor,
136           int downStreamBufferSize) throws IOException {
137         if (downStreamBufferSize > 0) {
138           return new BufferedOutputStream(downStream, downStreamBufferSize);
139         }
140 
141         return downStream;
142       }
143     };
144 
145     private final Configuration conf;
146     private final String compressName;
147 	// data input buffer size to absorb small reads from application.
148     private static final int DATA_IBUF_SIZE = 1 * 1024;
149 	// data output buffer size to absorb small writes from application.
150     private static final int DATA_OBUF_SIZE = 4 * 1024;
151 
152     Algorithm(String name) {
153       this.conf = new Configuration();
154       this.conf.setBoolean("hadoop.native.lib", true);
155       this.compressName = name;
156     }
157 
158     abstract CompressionCodec getCodec(Configuration conf);
159 
160     public InputStream createDecompressionStream(
161         InputStream downStream, Decompressor decompressor,
162         int downStreamBufferSize) throws IOException {
163       CompressionCodec codec = getCodec(conf);
164       // Set the internal buffer size to read from down stream.
165       if (downStreamBufferSize > 0) {
166         ((Configurable)codec).getConf().setInt("io.file.buffer.size",
167             downStreamBufferSize);
168       }
169       CompressionInputStream cis =
170           codec.createInputStream(downStream, decompressor);
171       BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
172       return bis2;
173 
174     }
175 
176     public OutputStream createCompressionStream(
177         OutputStream downStream, Compressor compressor, int downStreamBufferSize)
178         throws IOException {
179       CompressionCodec codec = getCodec(conf);
180       OutputStream bos1 = null;
181       if (downStreamBufferSize > 0) {
182         bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
183       }
184       else {
185         bos1 = downStream;
186       }
187       ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
188       CompressionOutputStream cos =
189           codec.createOutputStream(bos1, compressor);
190       BufferedOutputStream bos2 =
191           new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
192               DATA_OBUF_SIZE);
193       return bos2;
194     }
195 
196     public Compressor getCompressor() {
197       CompressionCodec codec = getCodec(conf);
198       if (codec != null) {
199         Compressor compressor = CodecPool.getCompressor(codec);
200         if (compressor != null) {
201           if (compressor.finished()) {
202             // Somebody returns the compressor to CodecPool but is still using
203             // it.
204             LOG
205                 .warn("Compressor obtained from CodecPool is already finished()");
206             // throw new AssertionError(
207             // "Compressor obtained from CodecPool is already finished()");
208           }
209           compressor.reset();
210         }
211         return compressor;
212       }
213       return null;
214     }
215 
216     public void returnCompressor(Compressor compressor) {
217       if (compressor != null) {
218         CodecPool.returnCompressor(compressor);
219       }
220     }
221 
222     public Decompressor getDecompressor() {
223       CompressionCodec codec = getCodec(conf);
224       if (codec != null) {
225         Decompressor decompressor = CodecPool.getDecompressor(codec);
226         if (decompressor != null) {
227           if (decompressor.finished()) {
228             // Somebody returns the decompressor to CodecPool but is still using
229             // it.
230             LOG
231                 .warn("Deompressor obtained from CodecPool is already finished()");
232             // throw new AssertionError(
233             // "Decompressor obtained from CodecPool is already finished()");
234           }
235           decompressor.reset();
236         }
237         return decompressor;
238       }
239 
240       return null;
241     }
242 
243     public void returnDecompressor(Decompressor decompressor) {
244       if (decompressor != null) {
245         CodecPool.returnDecompressor(decompressor);
246       }
247     }
248 
249     public String getName() {
250       return compressName;
251     }
252   }
253 
254   public static Algorithm getCompressionAlgorithmByName(String compressName) {
255     Algorithm[] algos = Algorithm.class.getEnumConstants();
256 
257     for (Algorithm a : algos) {
258       if (a.getName().equals(compressName)) {
259         return a;
260       }
261     }
262 
263     throw new IllegalArgumentException(
264         "Unsupported compression algorithm name: " + compressName);
265   }
266 
267   static String[] getSupportedAlgorithms() {
268     Algorithm[] algos = Algorithm.class.getEnumConstants();
269 
270     String[] ret = new String[algos.length];
271     int i = 0;
272     for (Algorithm a : algos) {
273       ret[i++] = a.getName();
274     }
275 
276     return ret;
277   }
278 }