1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.hadoop.hbase.io.hfile;
18
19 import java.io.BufferedInputStream;
20 import java.io.BufferedOutputStream;
21 import java.io.FilterOutputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configurable;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.io.compress.CodecPool;
31 import org.apache.hadoop.io.compress.CompressionCodec;
32 import org.apache.hadoop.io.compress.CompressionInputStream;
33 import org.apache.hadoop.io.compress.CompressionOutputStream;
34 import org.apache.hadoop.io.compress.Compressor;
35 import org.apache.hadoop.io.compress.Decompressor;
36 import org.apache.hadoop.io.compress.GzipCodec;
37 import org.apache.hadoop.io.compress.DefaultCodec;
38 import org.apache.hadoop.util.ReflectionUtils;
39
40
41
42
43
44 public final class Compression {
45 static final Log LOG = LogFactory.getLog(Compression.class);
46
47
48
49
50 private Compression() {
51 super();
52 }
53
54 static class FinishOnFlushCompressionStream extends FilterOutputStream {
55 public FinishOnFlushCompressionStream(CompressionOutputStream cout) {
56 super(cout);
57 }
58
59 @Override
60 public void write(byte b[], int off, int len) throws IOException {
61 out.write(b, off, len);
62 }
63
64 @Override
65 public void flush() throws IOException {
66 CompressionOutputStream cout = (CompressionOutputStream) out;
67 cout.finish();
68 cout.flush();
69 cout.resetState();
70 }
71 }
72
73
74
75
76
77
78 public static enum Algorithm {
79 LZO("lzo") {
80
81 private transient CompressionCodec lzoCodec;
82
83 @Override
84 CompressionCodec getCodec(Configuration conf) {
85 if (lzoCodec == null) {
86 try {
87 Class<?> externalCodec =
88 ClassLoader.getSystemClassLoader().loadClass("com.hadoop.compression.lzo.LzoCodec");
89 lzoCodec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
90 new Configuration(conf));
91 } catch (ClassNotFoundException e) {
92 throw new RuntimeException(e);
93 }
94 }
95 return lzoCodec;
96 }
97 },
98 GZ("gz") {
99 private transient GzipCodec codec;
100
101 @Override
102 DefaultCodec getCodec(Configuration conf) {
103 if (codec == null) {
104 codec = new GzipCodec();
105 codec.setConf(new Configuration(conf));
106 }
107
108 return codec;
109 }
110 },
111
112 NONE("none") {
113 @Override
114 DefaultCodec getCodec(Configuration conf) {
115 return null;
116 }
117
118 @Override
119 public synchronized InputStream createDecompressionStream(
120 InputStream downStream, Decompressor decompressor,
121 int downStreamBufferSize) throws IOException {
122 if (downStreamBufferSize > 0) {
123 return new BufferedInputStream(downStream, downStreamBufferSize);
124 }
125
126
127
128
129
130 return downStream;
131 }
132
133 @Override
134 public synchronized OutputStream createCompressionStream(
135 OutputStream downStream, Compressor compressor,
136 int downStreamBufferSize) throws IOException {
137 if (downStreamBufferSize > 0) {
138 return new BufferedOutputStream(downStream, downStreamBufferSize);
139 }
140
141 return downStream;
142 }
143 };
144
145 private final Configuration conf;
146 private final String compressName;
147
148 private static final int DATA_IBUF_SIZE = 1 * 1024;
149
150 private static final int DATA_OBUF_SIZE = 4 * 1024;
151
152 Algorithm(String name) {
153 this.conf = new Configuration();
154 this.conf.setBoolean("hadoop.native.lib", true);
155 this.compressName = name;
156 }
157
158 abstract CompressionCodec getCodec(Configuration conf);
159
160 public InputStream createDecompressionStream(
161 InputStream downStream, Decompressor decompressor,
162 int downStreamBufferSize) throws IOException {
163 CompressionCodec codec = getCodec(conf);
164
165 if (downStreamBufferSize > 0) {
166 ((Configurable)codec).getConf().setInt("io.file.buffer.size",
167 downStreamBufferSize);
168 }
169 CompressionInputStream cis =
170 codec.createInputStream(downStream, decompressor);
171 BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
172 return bis2;
173
174 }
175
176 public OutputStream createCompressionStream(
177 OutputStream downStream, Compressor compressor, int downStreamBufferSize)
178 throws IOException {
179 CompressionCodec codec = getCodec(conf);
180 OutputStream bos1 = null;
181 if (downStreamBufferSize > 0) {
182 bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
183 }
184 else {
185 bos1 = downStream;
186 }
187 ((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
188 CompressionOutputStream cos =
189 codec.createOutputStream(bos1, compressor);
190 BufferedOutputStream bos2 =
191 new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
192 DATA_OBUF_SIZE);
193 return bos2;
194 }
195
196 public Compressor getCompressor() {
197 CompressionCodec codec = getCodec(conf);
198 if (codec != null) {
199 Compressor compressor = CodecPool.getCompressor(codec);
200 if (compressor != null) {
201 if (compressor.finished()) {
202
203
204 LOG
205 .warn("Compressor obtained from CodecPool is already finished()");
206
207
208 }
209 compressor.reset();
210 }
211 return compressor;
212 }
213 return null;
214 }
215
216 public void returnCompressor(Compressor compressor) {
217 if (compressor != null) {
218 CodecPool.returnCompressor(compressor);
219 }
220 }
221
222 public Decompressor getDecompressor() {
223 CompressionCodec codec = getCodec(conf);
224 if (codec != null) {
225 Decompressor decompressor = CodecPool.getDecompressor(codec);
226 if (decompressor != null) {
227 if (decompressor.finished()) {
228
229
230 LOG
231 .warn("Deompressor obtained from CodecPool is already finished()");
232
233
234 }
235 decompressor.reset();
236 }
237 return decompressor;
238 }
239
240 return null;
241 }
242
243 public void returnDecompressor(Decompressor decompressor) {
244 if (decompressor != null) {
245 CodecPool.returnDecompressor(decompressor);
246 }
247 }
248
249 public String getName() {
250 return compressName;
251 }
252 }
253
254 public static Algorithm getCompressionAlgorithmByName(String compressName) {
255 Algorithm[] algos = Algorithm.class.getEnumConstants();
256
257 for (Algorithm a : algos) {
258 if (a.getName().equals(compressName)) {
259 return a;
260 }
261 }
262
263 throw new IllegalArgumentException(
264 "Unsupported compression algorithm name: " + compressName);
265 }
266
267 static String[] getSupportedAlgorithms() {
268 Algorithm[] algos = Algorithm.class.getEnumConstants();
269
270 String[] ret = new String[algos.length];
271 int i = 0;
272 for (Algorithm a : algos) {
273 ret[i++] = a.getName();
274 }
275
276 return ret;
277 }
278 }