001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.converter.stream; 018 019 import java.io.BufferedOutputStream; 020 import java.io.ByteArrayInputStream; 021 import java.io.ByteArrayOutputStream; 022 import java.io.File; 023 import java.io.FileInputStream; 024 import java.io.FileNotFoundException; 025 import java.io.FileOutputStream; 026 import java.io.IOException; 027 import java.io.InputStream; 028 import java.io.OutputStream; 029 import java.util.ArrayList; 030 import java.util.List; 031 import java.util.Map; 032 033 import org.apache.camel.StreamCache; 034 import org.apache.camel.converter.IOConverter; 035 import org.apache.camel.util.FileUtil; 036 import org.apache.camel.util.IOHelper; 037 038 /** 039 * This output stream will store the content into a File if the stream context size is exceed the 040 * THRESHOLD which's default value is 64K. The temp file will store in the temp directory, you 041 * can configure it by setting the TEMP_DIR property. If you don't set the TEMP_DIR property, 042 * it will choice the directory which is set by the system property of "java.io.tmpdir". 043 * You can get a cached input stream of this stream. The temp file which is created with this 044 * output stream will be deleted when you close this output stream or the cached inputStream. 045 */ 046 public class CachedOutputStream extends OutputStream { 047 public static final String THRESHOLD = "CamelCachedOutputStreamThreshold"; 048 public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory"; 049 050 protected boolean outputLocked; 051 protected OutputStream currentStream; 052 053 private long threshold = 64 * 1024; 054 055 private int totalLength; 056 057 private boolean inmem; 058 059 private File tempFile; 060 061 private File outputDir; 062 063 private List<Object> streamList = new ArrayList<Object>(); 064 065 066 public CachedOutputStream() { 067 currentStream = new ByteArrayOutputStream(2048); 068 inmem = true; 069 } 070 071 public CachedOutputStream(long threshold) { 072 this(); 073 this.threshold = threshold; 074 } 075 076 public CachedOutputStream(Map<String, String> properties) { 077 this(); 078 String value = properties.get(THRESHOLD); 079 if (value != null) { 080 int i = Integer.parseInt(value); 081 if (i > 0) { 082 threshold = i; 083 } 084 } 085 value = properties.get(TEMP_DIR); 086 if (value != null) { 087 File f = new File(value); 088 if (f.exists() && f.isDirectory()) { 089 outputDir = f; 090 } else { 091 outputDir = null; 092 } 093 } else { 094 outputDir = null; 095 } 096 } 097 098 /** 099 * Perform any actions required on stream flush (freeze headers, reset 100 * output stream ... etc.) 101 */ 102 protected void doFlush() throws IOException { 103 104 } 105 106 public void flush() throws IOException { 107 currentStream.flush(); 108 doFlush(); 109 } 110 111 /** 112 * Perform any actions required on stream closure (handle response etc.) 113 */ 114 protected void doClose() throws IOException { 115 116 } 117 118 /** 119 * Perform any actions required after stream closure (close the other related stream etc.) 120 */ 121 protected void postClose() throws IOException { 122 123 } 124 125 /** 126 * Locks the output stream to prevent additional writes, but maintains 127 * a pointer to it so an InputStream can be obtained 128 * @throws IOException 129 */ 130 public void lockOutputStream() throws IOException { 131 currentStream.flush(); 132 outputLocked = true; 133 streamList.remove(currentStream); 134 } 135 136 public void close() throws IOException { 137 currentStream.flush(); 138 doClose(); 139 currentStream.close(); 140 maybeDeleteTempFile(currentStream); 141 postClose(); 142 } 143 144 public boolean equals(Object obj) { 145 return currentStream.equals(obj); 146 } 147 148 /** 149 * Replace the original stream with the new one, optionally copying the content of the old one 150 * into the new one. 151 * When with Attachment, needs to replace the xml writer stream with the stream used by 152 * AttachmentSerializer or copy the cached output stream to the "real" 153 * output stream, i.e. onto the wire. 154 * 155 * @param out the new output stream 156 * @param copyOldContent flag indicating if the old content should be copied 157 * @throws IOException 158 */ 159 public void resetOut(OutputStream out, boolean copyOldContent) throws IOException { 160 if (out == null) { 161 out = new ByteArrayOutputStream(); 162 } 163 164 if (currentStream instanceof CachedOutputStream) { 165 CachedOutputStream ac = (CachedOutputStream) currentStream; 166 InputStream in = ac.getInputStream(); 167 IOHelper.copyAndCloseInput(in, out); 168 } else { 169 if (inmem) { 170 if (currentStream instanceof ByteArrayOutputStream) { 171 ByteArrayOutputStream byteOut = (ByteArrayOutputStream) currentStream; 172 if (copyOldContent && byteOut.size() > 0) { 173 byteOut.writeTo(out); 174 } 175 } else { 176 throw new IOException("Unknown format of currentStream"); 177 } 178 } else { 179 // read the file 180 currentStream.close(); 181 FileInputStream fin = new FileInputStream(tempFile); 182 if (copyOldContent) { 183 IOHelper.copyAndCloseInput(fin, out); 184 } 185 streamList.remove(currentStream); 186 tempFile.delete(); 187 tempFile = null; 188 inmem = true; 189 } 190 } 191 currentStream = out; 192 outputLocked = false; 193 } 194 195 public static void copyStream(InputStream in, OutputStream out, int bufferSize) throws IOException { 196 IOHelper.copyAndCloseInput(in, out, bufferSize); 197 } 198 199 public int size() { 200 return totalLength; 201 } 202 203 public byte[] getBytes() throws IOException { 204 flush(); 205 if (inmem) { 206 if (currentStream instanceof ByteArrayOutputStream) { 207 return ((ByteArrayOutputStream)currentStream).toByteArray(); 208 } else { 209 throw new IOException("Unknown format of currentStream"); 210 } 211 } else { 212 // read the file 213 FileInputStream fin = new FileInputStream(tempFile); 214 return IOConverter.toBytes(fin); 215 } 216 } 217 218 public void writeCacheTo(OutputStream out) throws IOException { 219 flush(); 220 if (inmem) { 221 if (currentStream instanceof ByteArrayOutputStream) { 222 ((ByteArrayOutputStream)currentStream).writeTo(out); 223 } else { 224 throw new IOException("Unknown format of currentStream"); 225 } 226 } else { 227 // read the file 228 FileInputStream fin = new FileInputStream(tempFile); 229 IOHelper.copyAndCloseInput(fin, out); 230 } 231 } 232 233 234 public void writeCacheTo(StringBuilder out, int limit) throws IOException { 235 flush(); 236 if (totalLength < limit 237 || limit == -1) { 238 writeCacheTo(out); 239 return; 240 } 241 242 int count = 0; 243 if (inmem) { 244 if (currentStream instanceof ByteArrayOutputStream) { 245 byte bytes[] = ((ByteArrayOutputStream)currentStream).toByteArray(); 246 out.append(IOHelper.newStringFromBytes(bytes, 0, limit)); 247 } else { 248 throw new IOException("Unknown format of currentStream"); 249 } 250 } else { 251 // read the file 252 FileInputStream fin = new FileInputStream(tempFile); 253 byte bytes[] = new byte[1024]; 254 int x = fin.read(bytes); 255 while (x != -1) { 256 if ((count + x) > limit) { 257 x = limit - count; 258 } 259 out.append(IOHelper.newStringFromBytes(bytes, 0, x)); 260 count += x; 261 262 if (count >= limit) { 263 x = -1; 264 } else { 265 x = fin.read(bytes); 266 } 267 } 268 fin.close(); 269 } 270 } 271 public void writeCacheTo(StringBuilder out) throws IOException { 272 flush(); 273 if (inmem) { 274 if (currentStream instanceof ByteArrayOutputStream) { 275 byte[] bytes = ((ByteArrayOutputStream)currentStream).toByteArray(); 276 out.append(IOHelper.newStringFromBytes(bytes)); 277 } else { 278 throw new IOException("Unknown format of currentStream"); 279 } 280 } else { 281 // read the file 282 FileInputStream fin = new FileInputStream(tempFile); 283 byte bytes[] = new byte[1024]; 284 int x = fin.read(bytes); 285 while (x != -1) { 286 out.append(IOHelper.newStringFromBytes(bytes, 0, x)); 287 x = fin.read(bytes); 288 } 289 fin.close(); 290 } 291 } 292 293 294 /** 295 * @return the underlying output stream 296 */ 297 public OutputStream getOut() { 298 return currentStream; 299 } 300 301 public int hashCode() { 302 return currentStream.hashCode(); 303 } 304 305 public String toString() { 306 StringBuilder builder = new StringBuilder().append("[") 307 .append(CachedOutputStream.class.getName()) 308 .append(" Content: "); 309 try { 310 writeCacheTo(builder); 311 } catch (IOException e) { 312 //ignore 313 } 314 return builder.append("]").toString(); 315 } 316 317 protected void onWrite() throws IOException { 318 319 } 320 321 public void write(byte[] b, int off, int len) throws IOException { 322 if (!outputLocked) { 323 onWrite(); 324 this.totalLength += len; 325 if (inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) { 326 createFileOutputStream(); 327 } 328 currentStream.write(b, off, len); 329 } 330 } 331 332 public void write(byte[] b) throws IOException { 333 if (!outputLocked) { 334 onWrite(); 335 this.totalLength += b.length; 336 if (inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) { 337 createFileOutputStream(); 338 } 339 currentStream.write(b); 340 } 341 } 342 343 public void write(int b) throws IOException { 344 if (!outputLocked) { 345 onWrite(); 346 this.totalLength++; 347 if (inmem && totalLength > threshold && currentStream instanceof ByteArrayOutputStream) { 348 createFileOutputStream(); 349 } 350 currentStream.write(b); 351 } 352 } 353 354 private void createFileOutputStream() throws IOException { 355 ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream; 356 if (outputDir == null) { 357 tempFile = FileUtil.createTempFile("cos", "tmp"); 358 } else { 359 tempFile = FileUtil.createTempFile("cos", "tmp", outputDir, false); 360 } 361 362 currentStream = new BufferedOutputStream(new FileOutputStream(tempFile)); 363 bout.writeTo(currentStream); 364 inmem = false; 365 streamList.add(currentStream); 366 } 367 368 public File getTempFile() { 369 return tempFile != null && tempFile.exists() ? tempFile : null; 370 } 371 372 public InputStream getInputStream() throws IOException { 373 flush(); 374 if (inmem) { 375 if (currentStream instanceof ByteArrayOutputStream) { 376 return new ByteArrayInputStream(((ByteArrayOutputStream) currentStream).toByteArray()); 377 } else { 378 return null; 379 } 380 } else { 381 try { 382 FileInputStream fileInputStream = new FileInputStream(tempFile) { 383 public void close() throws IOException { 384 super.close(); 385 maybeDeleteTempFile(this); 386 } 387 }; 388 streamList.add(fileInputStream); 389 return fileInputStream; 390 } catch (FileNotFoundException e) { 391 throw new IOException("Cached file was deleted, " + e.toString()); 392 } 393 } 394 } 395 396 public StreamCache getStreamCache() throws IOException { 397 flush(); 398 if (inmem) { 399 if (currentStream instanceof ByteArrayOutputStream) { 400 return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray()); 401 } else { 402 return null; 403 } 404 } else { 405 try { 406 FileInputStreamCache fileInputStream = new FileInputStreamCache(tempFile, this); 407 return fileInputStream; 408 } catch (FileNotFoundException e) { 409 throw new IOException("Cached file was deleted, " + e.toString()); 410 } 411 } 412 } 413 414 private void maybeDeleteTempFile(Object stream) { 415 streamList.remove(stream); 416 if (!inmem && tempFile != null && streamList.isEmpty()) { 417 tempFile.delete(); 418 tempFile = null; 419 currentStream = new ByteArrayOutputStream(1024); 420 inmem = true; 421 } 422 } 423 424 public void setOutputDir(File outputDir) throws IOException { 425 this.outputDir = outputDir; 426 } 427 public void setThreshold(long threshold) { 428 this.threshold = threshold; 429 } 430 431 }