001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.converter.stream;
018    
019    import java.io.BufferedOutputStream;
020    import java.io.ByteArrayInputStream;
021    import java.io.ByteArrayOutputStream;
022    import java.io.File;
023    import java.io.FileInputStream;
024    import java.io.FileNotFoundException;
025    import java.io.FileOutputStream;
026    import java.io.IOException;
027    import java.io.InputStream;
028    import java.io.OutputStream;
029    import java.util.ArrayList;
030    import java.util.List;
031    import java.util.Map;
032    
033    import org.apache.camel.StreamCache;
034    import org.apache.camel.converter.IOConverter;
035    import org.apache.camel.util.FileUtil;
036    import org.apache.camel.util.IOHelper;
037    
038    /**
039     * This output stream will store the content into a File if the stream context size is exceed the
040     * THRESHOLD which's default value is 64K. The temp file will store in the temp directory, you 
041     * can configure it by setting the TEMP_DIR property. If you don't set the TEMP_DIR property,
042     * it will choice the directory which is set by the system property of "java.io.tmpdir".
043     * You can get a cached input stream of this stream. The temp file which is created with this 
044     * output stream will be deleted when you close this output stream or the cached inputStream.
045     */
046    public class CachedOutputStream extends OutputStream {
047        public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
048        public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
049       
050        protected boolean outputLocked;
051        protected OutputStream currentStream;
052    
053        private long threshold = 64 * 1024;
054    
055        private int totalLength;
056    
057        private boolean inmem;
058    
059        private File tempFile;
060    
061        private File outputDir;   
062        
063        private List<Object> streamList = new ArrayList<Object>();
064    
065        
066        public CachedOutputStream() {
067            currentStream = new ByteArrayOutputStream(2048);
068            inmem = true;
069        }
070    
071        public CachedOutputStream(long threshold) {
072            this();
073            this.threshold = threshold;        
074        }
075        
076        public CachedOutputStream(Map<String, String> properties) {
077            this();
078            String value = properties.get(THRESHOLD);
079            if (value != null) {
080                int i = Integer.parseInt(value);
081                if (i > 0) {
082                    threshold = i;
083                }
084            }
085            value = properties.get(TEMP_DIR);
086            if (value != null) {
087                File f = new File(value);
088                if (f.exists() && f.isDirectory()) {
089                    outputDir = f;
090                } else {
091                    outputDir = null;
092                }
093            } else {
094                outputDir = null;
095            }        
096        }
097    
098        /**
099         * Perform any actions required on stream flush (freeze headers, reset
100         * output stream ... etc.)
101         */
102        protected void doFlush() throws IOException {
103            
104        }
105    
106        public void flush() throws IOException {
107            currentStream.flush();       
108            doFlush();
109        }
110    
111        /**
112         * Perform any actions required on stream closure (handle response etc.)
113         */
114        protected void doClose() throws IOException {
115            
116        }
117        
118        /**
119         * Perform any actions required after stream closure (close the other related stream etc.)
120         */
121        protected void postClose() throws IOException {
122            
123        }
124    
125        /**
126         * Locks the output stream to prevent additional writes, but maintains
127         * a pointer to it so an InputStream can be obtained
128         * @throws IOException
129         */
130        public void lockOutputStream() throws IOException {
131            currentStream.flush();
132            outputLocked = true;
133            streamList.remove(currentStream);
134        }
135        
136        public void close() throws IOException {
137            currentStream.flush();        
138            doClose();
139            currentStream.close();
140            maybeDeleteTempFile(currentStream);
141            postClose();
142        }
143    
144        public boolean equals(Object obj) {
145            return currentStream.equals(obj);
146        }
147    
148        /**
149         * Replace the original stream with the new one, optionally copying the content of the old one
150         * into the new one.
151         * When with Attachment, needs to replace the xml writer stream with the stream used by
152         * AttachmentSerializer or copy the cached output stream to the "real"
153         * output stream, i.e. onto the wire.
154         * 
155         * @param out the new output stream
156         * @param copyOldContent flag indicating if the old content should be copied
157         * @throws IOException
158         */
159        public void resetOut(OutputStream out, boolean copyOldContent) throws IOException {
160            if (out == null) {
161                out = new ByteArrayOutputStream();
162            }
163    
164            if (currentStream instanceof CachedOutputStream) {
165                CachedOutputStream ac = (CachedOutputStream) currentStream;
166                InputStream in = ac.getInputStream();
167                IOHelper.copyAndCloseInput(in, out);
168            } else {
169                if (inmem) {
170                    if (currentStream instanceof ByteArrayOutputStream) {
171                        ByteArrayOutputStream byteOut = (ByteArrayOutputStream) currentStream;
172                        if (copyOldContent && byteOut.size() > 0) {
173                            byteOut.writeTo(out);
174                        }
175                    } else {
176                        throw new IOException("Unknown format of currentStream");
177                    }
178                } else {
179                    // read the file
180                    currentStream.close();
181                    FileInputStream fin = new FileInputStream(tempFile);
182                    if (copyOldContent) {
183                        IOHelper.copyAndCloseInput(fin, out);
184                    }
185                    streamList.remove(currentStream);
186                    tempFile.delete();
187                    tempFile = null;
188                    inmem = true;
189                }
190            }
191            currentStream = out;
192            outputLocked = false;
193        }
194    
195        public static void copyStream(InputStream in, OutputStream out, int bufferSize) throws IOException {
196            IOHelper.copyAndCloseInput(in, out, bufferSize);
197        }
198    
199        public int size() {
200            return totalLength;
201        }
202        
203        public byte[] getBytes() throws IOException {
204            flush();
205            if (inmem) {
206                if (currentStream instanceof ByteArrayOutputStream) {
207                    return ((ByteArrayOutputStream)currentStream).toByteArray();
208                } else {
209                    throw new IOException("Unknown format of currentStream");
210                }
211            } else {
212                // read the file
213                FileInputStream fin = new FileInputStream(tempFile);
214                return IOConverter.toBytes(fin);
215            }
216        }
217        
218        public void writeCacheTo(OutputStream out) throws IOException {
219            flush();
220            if (inmem) {
221                if (currentStream instanceof ByteArrayOutputStream) {
222                    ((ByteArrayOutputStream)currentStream).writeTo(out);
223                } else {
224                    throw new IOException("Unknown format of currentStream");
225                }
226            } else {
227                // read the file
228                FileInputStream fin = new FileInputStream(tempFile);
229                IOHelper.copyAndCloseInput(fin, out);
230            }
231        }
232        
233        
234        public void writeCacheTo(StringBuilder out, int limit) throws IOException {
235            flush();
236            if (totalLength < limit
237                || limit == -1) {
238                writeCacheTo(out);
239                return;
240            }
241            
242            int count = 0;
243            if (inmem) {
244                if (currentStream instanceof ByteArrayOutputStream) {
245                    byte bytes[] = ((ByteArrayOutputStream)currentStream).toByteArray();
246                    out.append(IOHelper.newStringFromBytes(bytes, 0, limit));
247                } else {
248                    throw new IOException("Unknown format of currentStream");
249                }
250            } else {
251                // read the file
252                FileInputStream fin = new FileInputStream(tempFile);
253                byte bytes[] = new byte[1024];
254                int x = fin.read(bytes);
255                while (x != -1) {
256                    if ((count + x) > limit) {
257                        x = limit - count;
258                    }
259                    out.append(IOHelper.newStringFromBytes(bytes, 0, x));
260                    count += x;
261                    
262                    if (count >= limit) {
263                        x = -1;
264                    } else {
265                        x = fin.read(bytes);
266                    }
267                }
268                fin.close();
269            }
270        }
271        public void writeCacheTo(StringBuilder out) throws IOException {
272            flush();
273            if (inmem) {
274                if (currentStream instanceof ByteArrayOutputStream) {
275                    byte[] bytes = ((ByteArrayOutputStream)currentStream).toByteArray();
276                    out.append(IOHelper.newStringFromBytes(bytes));
277                } else {
278                    throw new IOException("Unknown format of currentStream");
279                }
280            } else {
281                // read the file
282                FileInputStream fin = new FileInputStream(tempFile);
283                byte bytes[] = new byte[1024];
284                int x = fin.read(bytes);
285                while (x != -1) {
286                    out.append(IOHelper.newStringFromBytes(bytes, 0, x));
287                    x = fin.read(bytes);
288                }
289                fin.close();
290            }
291        }    
292        
293    
294        /**
295         * @return the underlying output stream
296         */
297        public OutputStream getOut() {
298            return currentStream;
299        }
300    
301        public int hashCode() {
302            return currentStream.hashCode();
303        }
304    
305        public String toString() {
306            StringBuilder builder = new StringBuilder().append("[")
307                .append(CachedOutputStream.class.getName())
308                .append(" Content: ");
309            try {
310                writeCacheTo(builder);
311            } catch (IOException e) {
312                //ignore
313            }
314            return builder.append("]").toString();
315        }
316    
317        protected void onWrite() throws IOException {
318            
319        }
320    
321        public void write(byte[] b, int off, int len) throws IOException {
322            if (!outputLocked) {
323                onWrite();
324                this.totalLength += len;
325                if (inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
326                    createFileOutputStream();
327                }
328                currentStream.write(b, off, len);
329            }
330        }
331    
332        public void write(byte[] b) throws IOException {
333            if (!outputLocked) {
334                onWrite();
335                this.totalLength += b.length;
336                if (inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
337                    createFileOutputStream();
338                }
339                currentStream.write(b);
340            }
341        }
342    
343        public void write(int b) throws IOException {
344            if (!outputLocked) {
345                onWrite();
346                this.totalLength++;
347                if (inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) {
348                    createFileOutputStream();
349                }
350                currentStream.write(b);
351            }
352        }
353    
354        private void createFileOutputStream() throws IOException {
355            ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
356            if (outputDir == null) {
357                tempFile = FileUtil.createTempFile("cos", "tmp");
358            } else {
359                tempFile = FileUtil.createTempFile("cos", "tmp", outputDir, false);
360            }
361            
362            currentStream = new BufferedOutputStream(new FileOutputStream(tempFile));
363            bout.writeTo(currentStream);
364            inmem = false;
365            streamList.add(currentStream);
366        }
367    
368        public File getTempFile() {
369            return tempFile != null && tempFile.exists() ? tempFile : null;
370        }
371    
372        public InputStream getInputStream() throws IOException {
373            flush();
374            if (inmem) {
375                if (currentStream instanceof ByteArrayOutputStream) {
376                    return new ByteArrayInputStream(((ByteArrayOutputStream) currentStream).toByteArray());
377                } else {
378                    return null;
379                }
380            } else {
381                try {
382                    FileInputStream fileInputStream = new FileInputStream(tempFile) {
383                        public void close() throws IOException {
384                            super.close();
385                            maybeDeleteTempFile(this);
386                        }
387                    };
388                    streamList.add(fileInputStream);
389                    return fileInputStream;
390                } catch (FileNotFoundException e) {
391                    throw new IOException("Cached file was deleted, " + e.toString());
392                }
393            }
394        }
395        
396        public StreamCache getStreamCache() throws IOException {
397            flush();
398            if (inmem) {
399                if (currentStream instanceof ByteArrayOutputStream) {
400                    return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray());
401                } else {
402                    return null;
403                }
404            } else {
405                try {
406                    FileInputStreamCache fileInputStream = new FileInputStreamCache(tempFile, this);
407                    return fileInputStream;
408                } catch (FileNotFoundException e) {
409                    throw new IOException("Cached file was deleted, " + e.toString());
410                }
411            }
412        }
413        
414        private void maybeDeleteTempFile(Object stream) {        
415            streamList.remove(stream);        
416            if (!inmem && tempFile != null && streamList.isEmpty()) {            
417                tempFile.delete();
418                tempFile = null;
419                currentStream = new ByteArrayOutputStream(1024);
420                inmem = true;
421            }
422        }
423    
424        public void setOutputDir(File outputDir) throws IOException {
425            this.outputDir = outputDir;
426        }
427        public void setThreshold(long threshold) {
428            this.threshold = threshold;
429        }
430    
431    }