001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.converter.stream; 018 019 import java.io.BufferedOutputStream; 020 import java.io.ByteArrayInputStream; 021 import java.io.ByteArrayOutputStream; 022 import java.io.File; 023 import java.io.FileInputStream; 024 import java.io.FileNotFoundException; 025 import java.io.FileOutputStream; 026 import java.io.IOException; 027 import java.io.InputStream; 028 import java.io.OutputStream; 029 import java.io.Serializable; 030 import java.util.ArrayList; 031 import java.util.List; 032 import java.util.Map; 033 034 import org.apache.camel.StreamCache; 035 import org.apache.camel.converter.IOConverter; 036 import org.apache.camel.util.FileUtil; 037 import org.apache.camel.util.IOHelper; 038 039 /** 040 * This output stream will store the content into a File if the stream context size is exceed the 041 * THRESHOLD which's default value is 64K. The temp file will store in the temp directory, you 042 * can configure it by setting the TEMP_DIR property. If you don't set the TEMP_DIR property, 043 * it will choice the directory which is set by the system property of "java.io.tmpdir". 044 * You can get a cached input stream of this stream. The temp file which is created with this 045 * output stream will be deleted when you close this output stream or the cached inputStream. 046 */ 047 public class CachedOutputStream extends OutputStream { 048 public static final String THRESHOLD = "CamelCachedOutputStreamThreshold"; 049 public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory"; 050 051 protected boolean outputLocked; 052 protected OutputStream currentStream; 053 054 private final List<Object> streamList = new ArrayList<Object>(); 055 private long threshold = 64 * 1024; 056 private int totalLength; 057 private boolean inMemory; 058 private File tempFile; 059 private File outputDir; 060 061 public CachedOutputStream() { 062 currentStream = new ByteArrayOutputStream(2048); 063 inMemory = true; 064 } 065 066 public CachedOutputStream(long threshold) { 067 this(); 068 this.threshold = threshold; 069 } 070 071 public CachedOutputStream(Map<String, String> properties) { 072 this(); 073 String value = properties.get(THRESHOLD); 074 if (value != null) { 075 int i = Integer.parseInt(value); 076 if (i > 0) { 077 threshold = i; 078 } 079 } 080 value = properties.get(TEMP_DIR); 081 if (value != null) { 082 File f = new File(value); 083 if (f.exists() && f.isDirectory()) { 084 outputDir = f; 085 } else { 086 outputDir = null; 087 } 088 } else { 089 outputDir = null; 090 } 091 } 092 093 /** 094 * Perform any actions required on stream flush (freeze headers, reset 095 * output stream ... etc.) 096 */ 097 protected void doFlush() throws IOException { 098 } 099 100 public void flush() throws IOException { 101 currentStream.flush(); 102 doFlush(); 103 } 104 105 /** 106 * Perform any actions required on stream closure (handle response etc.) 107 */ 108 protected void doClose() throws IOException { 109 } 110 111 /** 112 * Perform any actions required after stream closure (close the other related stream etc.) 113 */ 114 protected void postClose() throws IOException { 115 } 116 117 /** 118 * Locks the output stream to prevent additional writes, but maintains 119 * a pointer to it so an InputStream can be obtained 120 * @throws IOException 121 */ 122 public void lockOutputStream() throws IOException { 123 currentStream.flush(); 124 outputLocked = true; 125 streamList.remove(currentStream); 126 } 127 128 public void close() throws IOException { 129 currentStream.flush(); 130 doClose(); 131 currentStream.close(); 132 maybeDeleteTempFile(currentStream); 133 postClose(); 134 } 135 136 public boolean equals(Object obj) { 137 return currentStream.equals(obj); 138 } 139 140 /** 141 * Replace the original stream with the new one, optionally copying the content of the old one 142 * into the new one. 143 * When with Attachment, needs to replace the xml writer stream with the stream used by 144 * AttachmentSerializer or copy the cached output stream to the "real" 145 * output stream, i.e. onto the wire. 146 * 147 * @param out the new output stream 148 * @param copyOldContent flag indicating if the old content should be copied 149 * @throws IOException 150 */ 151 public void resetOut(OutputStream out, boolean copyOldContent) throws IOException { 152 if (out == null) { 153 out = new ByteArrayOutputStream(); 154 } 155 156 if (currentStream instanceof CachedOutputStream) { 157 CachedOutputStream ac = (CachedOutputStream) currentStream; 158 InputStream in = ac.getInputStream(); 159 IOHelper.copyAndCloseInput(in, out); 160 } else { 161 if (inMemory) { 162 if (currentStream instanceof ByteArrayOutputStream) { 163 ByteArrayOutputStream byteOut = (ByteArrayOutputStream) currentStream; 164 if (copyOldContent && byteOut.size() > 0) { 165 byteOut.writeTo(out); 166 } 167 } else { 168 throw new IOException("Unknown format of currentStream: " + currentStream); 169 } 170 } else { 171 // read the file 172 currentStream.close(); 173 FileInputStream fin = new FileInputStream(tempFile); 174 if (copyOldContent) { 175 IOHelper.copyAndCloseInput(fin, out); 176 } 177 streamList.remove(currentStream); 178 tempFile.delete(); 179 tempFile = null; 180 inMemory = true; 181 } 182 } 183 currentStream = out; 184 outputLocked = false; 185 } 186 187 public static void copyStream(InputStream in, OutputStream out, int bufferSize) throws IOException { 188 IOHelper.copyAndCloseInput(in, out, bufferSize); 189 } 190 191 public int size() { 192 return totalLength; 193 } 194 195 public byte[] getBytes() throws IOException { 196 flush(); 197 if (inMemory) { 198 if (currentStream instanceof ByteArrayOutputStream) { 199 return ((ByteArrayOutputStream)currentStream).toByteArray(); 200 } else { 201 throw new IOException("Unknown format of currentStream"); 202 } 203 } else { 204 // read the file 205 FileInputStream fin = new FileInputStream(tempFile); 206 return IOConverter.toBytes(fin); 207 } 208 } 209 210 public void writeCacheTo(OutputStream out) throws IOException { 211 flush(); 212 if (inMemory) { 213 if (currentStream instanceof ByteArrayOutputStream) { 214 ((ByteArrayOutputStream)currentStream).writeTo(out); 215 } else { 216 throw new IOException("Unknown format of currentStream"); 217 } 218 } else { 219 // read the file 220 FileInputStream fin = new FileInputStream(tempFile); 221 IOHelper.copyAndCloseInput(fin, out); 222 } 223 } 224 225 226 public void writeCacheTo(StringBuilder out, int limit) throws IOException { 227 flush(); 228 if (totalLength < limit 229 || limit == -1) { 230 writeCacheTo(out); 231 return; 232 } 233 234 int count = 0; 235 if (inMemory) { 236 if (currentStream instanceof ByteArrayOutputStream) { 237 byte bytes[] = ((ByteArrayOutputStream)currentStream).toByteArray(); 238 out.append(IOHelper.newStringFromBytes(bytes, 0, limit)); 239 } else { 240 throw new IOException("Unknown format of currentStream: " + currentStream); 241 } 242 } else { 243 // read the file 244 FileInputStream fin = new FileInputStream(tempFile); 245 byte bytes[] = new byte[1024]; 246 int x = fin.read(bytes); 247 while (x != -1) { 248 if ((count + x) > limit) { 249 x = limit - count; 250 } 251 out.append(IOHelper.newStringFromBytes(bytes, 0, x)); 252 count += x; 253 254 if (count >= limit) { 255 x = -1; 256 } else { 257 x = fin.read(bytes); 258 } 259 } 260 fin.close(); 261 } 262 } 263 public void writeCacheTo(StringBuilder out) throws IOException { 264 flush(); 265 if (inMemory) { 266 if (currentStream instanceof ByteArrayOutputStream) { 267 byte[] bytes = ((ByteArrayOutputStream)currentStream).toByteArray(); 268 out.append(IOHelper.newStringFromBytes(bytes)); 269 } else { 270 throw new IOException("Unknown format of currentStream: " + currentStream); 271 } 272 } else { 273 // read the file 274 FileInputStream fin = new FileInputStream(tempFile); 275 byte bytes[] = new byte[1024]; 276 int x = fin.read(bytes); 277 while (x != -1) { 278 out.append(IOHelper.newStringFromBytes(bytes, 0, x)); 279 x = fin.read(bytes); 280 } 281 fin.close(); 282 } 283 } 284 285 286 /** 287 * @return the underlying output stream 288 */ 289 public OutputStream getOut() { 290 return currentStream; 291 } 292 293 public int hashCode() { 294 return currentStream.hashCode(); 295 } 296 297 public String toString() { 298 StringBuilder builder = new StringBuilder().append("[") 299 .append(CachedOutputStream.class.getName()) 300 .append(" Content: "); 301 try { 302 writeCacheTo(builder); 303 } catch (IOException e) { 304 //ignore 305 } 306 return builder.append("]").toString(); 307 } 308 309 protected void onWrite() throws IOException { 310 } 311 312 public void write(byte[] b, int off, int len) throws IOException { 313 if (!outputLocked) { 314 onWrite(); 315 this.totalLength += len; 316 if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) { 317 createFileOutputStream(); 318 } 319 currentStream.write(b, off, len); 320 } 321 } 322 323 public void write(byte[] b) throws IOException { 324 if (!outputLocked) { 325 onWrite(); 326 this.totalLength += b.length; 327 if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) { 328 createFileOutputStream(); 329 } 330 currentStream.write(b); 331 } 332 } 333 334 public void write(int b) throws IOException { 335 if (!outputLocked) { 336 onWrite(); 337 this.totalLength++; 338 if (inMemory && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) { 339 createFileOutputStream(); 340 } 341 currentStream.write(b); 342 } 343 } 344 345 private void createFileOutputStream() throws IOException { 346 ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream; 347 if (outputDir == null) { 348 tempFile = FileUtil.createTempFile("cos", "tmp"); 349 } else { 350 tempFile = FileUtil.createTempFile("cos", "tmp", outputDir, false); 351 } 352 353 currentStream = new BufferedOutputStream(new FileOutputStream(tempFile)); 354 bout.writeTo(currentStream); 355 inMemory = false; 356 streamList.add(currentStream); 357 } 358 359 public File getTempFile() { 360 return tempFile != null && tempFile.exists() ? tempFile : null; 361 } 362 363 public InputStream getInputStream() throws IOException { 364 flush(); 365 if (inMemory) { 366 if (currentStream instanceof ByteArrayOutputStream) { 367 return new ByteArrayInputStream(((ByteArrayOutputStream) currentStream).toByteArray()); 368 } else { 369 return null; 370 } 371 } else { 372 try { 373 FileInputStream fileInputStream = new FileInputStream(tempFile) { 374 public void close() throws IOException { 375 super.close(); 376 maybeDeleteTempFile(this); 377 } 378 }; 379 streamList.add(fileInputStream); 380 return fileInputStream; 381 } catch (FileNotFoundException e) { 382 throw IOHelper.createIOException("Cached file was already deleted", e); 383 } 384 } 385 } 386 387 public StreamCache getStreamCache() throws IOException { 388 flush(); 389 if (inMemory) { 390 if (currentStream instanceof ByteArrayOutputStream) { 391 return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray()); 392 } else { 393 return null; 394 } 395 } else { 396 try { 397 return new FileInputStreamCache(tempFile, this); 398 } catch (FileNotFoundException e) { 399 throw IOHelper.createIOException("Cached file was already deleted", e); 400 } 401 } 402 } 403 404 private void maybeDeleteTempFile(Object stream) { 405 streamList.remove(stream); 406 if (!inMemory && tempFile != null && streamList.isEmpty()) { 407 tempFile.delete(); 408 tempFile = null; 409 currentStream = new ByteArrayOutputStream(1024); 410 inMemory = true; 411 } 412 } 413 414 public void setOutputDir(File outputDir) throws IOException { 415 this.outputDir = outputDir; 416 } 417 418 public void setThreshold(long threshold) { 419 this.threshold = threshold; 420 } 421 422 }