001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file; 018 019 import java.io.File; 020 import java.io.InputStream; 021 022 import org.apache.camel.Exchange; 023 import org.apache.camel.Expression; 024 import org.apache.camel.impl.DefaultProducer; 025 import org.apache.camel.spi.Language; 026 import org.apache.camel.util.ExchangeHelper; 027 import org.apache.camel.util.FileUtil; 028 import org.apache.camel.util.ObjectHelper; 029 import org.apache.commons.logging.Log; 030 import org.apache.commons.logging.LogFactory; 031 032 /** 033 * Generic file producer 034 */ 035 public class GenericFileProducer<T> extends DefaultProducer { 036 protected final transient Log log = LogFactory.getLog(getClass()); 037 protected final GenericFileEndpoint<T> endpoint; 038 protected final GenericFileOperations<T> operations; 039 040 protected GenericFileProducer(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) { 041 super(endpoint); 042 this.endpoint = endpoint; 043 this.operations = operations; 044 } 045 046 protected String getFileSeparator() { 047 return File.separator; 048 } 049 050 protected String normalizePath(String name) { 051 return FileUtil.normalizePath(name); 052 } 053 054 @SuppressWarnings("unchecked") 055 public void process(Exchange exchange) throws Exception { 056 GenericFileExchange<T> fileExchange = (GenericFileExchange<T>) endpoint.createExchange(exchange); 057 processExchange(fileExchange); 058 ExchangeHelper.copyResults(exchange, fileExchange); 059 } 060 061 /** 062 * Perform the work to process the fileExchange 063 * 064 * @param exchange fileExchange 065 * @throws Exception is thrown if some error 066 */ 067 protected void processExchange(GenericFileExchange<T> exchange) throws Exception { 068 if (log.isTraceEnabled()) { 069 log.trace("Processing " + exchange); 070 } 071 072 try { 073 String target = createFileName(exchange); 074 075 preWriteCheck(); 076 077 // should we write to a temporary name and then afterwards rename to real target 078 boolean writeAsTempAndRename = ObjectHelper.isNotEmpty(endpoint.getTempPrefix()); 079 String tempTarget = null; 080 if (writeAsTempAndRename) { 081 // compute temporary name with the temp prefix 082 tempTarget = createTempFileName(target); 083 } 084 085 // upload the file 086 writeFile(exchange, tempTarget != null ? tempTarget : target); 087 088 // if we did write to a temporary name then rename it to the real 089 // name after we have written the file 090 if (tempTarget != null) { 091 if (log.isTraceEnabled()) { 092 log.trace("Renaming file: [" + tempTarget + "] to: [" + target + "]"); 093 } 094 boolean renamed = operations.renameFile(tempTarget, target); 095 if (!renamed) { 096 throw new GenericFileOperationFailedException("Cannot rename file from: " + tempTarget + " to: " + target); 097 } 098 } 099 100 // lets store the name we really used in the header, so end-users 101 // can retrieve it 102 exchange.getIn().setHeader(Exchange.FILE_NAME_PRODUCED, target); 103 } catch (Exception e) { 104 handleFailedWrite(exchange, e); 105 } 106 } 107 108 /** 109 * If we fail writing out a file, we will call this method. This hook is 110 * provided to disconnect from servers or clean up files we created (if needed). 111 */ 112 protected void handleFailedWrite(GenericFileExchange<T> exchange, Exception exception) throws Exception { 113 throw exception; 114 } 115 116 /** 117 * Perform any actions that need to occur before we write such as connecting to an FTP server etc. 118 */ 119 protected void preWriteCheck() throws Exception { 120 } 121 122 protected void writeFile(GenericFileExchange<T> exchange, String fileName) throws GenericFileOperationFailedException { 123 InputStream payload = exchange.getIn().getBody(InputStream.class); 124 try { 125 // build directory if auto create is enabled 126 if (endpoint.isAutoCreate()) { 127 // use java.io.File to compute the file path 128 File file = new File(fileName); 129 String directory = file.getParent(); 130 boolean absolute = file.isAbsolute(); 131 if (directory != null) { 132 if (!operations.buildDirectory(directory, absolute)) { 133 log.debug("Cannot build directory [" + directory + "] (could be because of denied permissions)"); 134 } 135 } 136 } 137 138 // upload 139 if (log.isTraceEnabled()) { 140 log.trace("About to write [" + fileName + "] to [" + getEndpoint() + "] from exchange [" + exchange + "]"); 141 } 142 143 boolean success = operations.storeFile(fileName, exchange); 144 if (!success) { 145 throw new GenericFileOperationFailedException("Error writing file [" + fileName + "]"); 146 } 147 if (log.isDebugEnabled()) { 148 log.debug("Wrote [" + fileName + "] to [" + getEndpoint() + "]"); 149 } 150 151 } finally { 152 ObjectHelper.close(payload, "Closing payload", log); 153 } 154 155 } 156 157 protected String createFileName(Exchange exchange) { 158 String answer; 159 160 String name = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class); 161 162 // expression support 163 Expression expression = endpoint.getFileName(); 164 if (name != null) { 165 // the header name can be an expression too, that should override 166 // whatever configured on the endpoint 167 if (name.indexOf("${") > -1) { 168 if (log.isDebugEnabled()) { 169 log.debug(Exchange.FILE_NAME + " contains a FileLanguage expression: " + name); 170 } 171 Language language = getEndpoint().getCamelContext().resolveLanguage("file"); 172 expression = language.createExpression(name); 173 } 174 } 175 if (expression != null) { 176 if (log.isDebugEnabled()) { 177 log.debug("Filename evaluated as expression: " + expression); 178 } 179 name = expression.evaluate(exchange, String.class); 180 } 181 182 // flatten name 183 if (name != null && endpoint.isFlatten()) { 184 int pos = name.lastIndexOf(getFileSeparator()); 185 if (pos == -1) { 186 pos = name.lastIndexOf('/'); 187 } 188 if (pos != -1) { 189 name = name.substring(pos + 1); 190 } 191 } 192 193 // compute path by adding endpoint starting directory 194 String endpointPath = endpoint.getConfiguration().getDirectory(); 195 // Its a directory so we should use it as a base path for the filename 196 // If the path isn't empty, we need to add a trailing / if it isn't already there 197 String baseDir = ""; 198 if (endpointPath.length() > 0) { 199 baseDir = endpointPath + (endpointPath.endsWith(getFileSeparator()) ? "" : getFileSeparator()); 200 } 201 if (name != null) { 202 answer = baseDir + name; 203 } else { 204 // use a generated filename if no name provided 205 answer = baseDir + endpoint.getGeneratedFileName(exchange.getIn()); 206 } 207 208 // must normalize path to cater for Windows and other OS 209 answer = normalizePath(answer); 210 211 return answer; 212 } 213 214 protected String createTempFileName(String fileName) { 215 // must normalize path to cater for Windows and other OS 216 fileName = normalizePath(fileName); 217 218 int path = fileName.lastIndexOf(getFileSeparator()); 219 if (path == -1) { 220 // no path 221 return endpoint.getTempPrefix() + fileName; 222 } else { 223 StringBuilder sb = new StringBuilder(fileName); 224 sb.insert(path + 1, endpoint.getTempPrefix()); 225 return sb.toString(); 226 } 227 } 228 229 }