001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file; 018 019 import java.io.File; 020 import java.io.InputStream; 021 022 import org.apache.camel.Exchange; 023 import org.apache.camel.Expression; 024 import org.apache.camel.impl.DefaultProducer; 025 import org.apache.camel.language.simple.FileLanguage; 026 import org.apache.camel.util.ExchangeHelper; 027 import org.apache.camel.util.FileUtil; 028 import org.apache.camel.util.ObjectHelper; 029 import org.apache.commons.logging.Log; 030 import org.apache.commons.logging.LogFactory; 031 032 /** 033 * Generic file producer 034 */ 035 public class GenericFileProducer<T> extends DefaultProducer { 036 protected final transient Log log = LogFactory.getLog(getClass()); 037 protected final GenericFileEndpoint<T> endpoint; 038 protected final GenericFileOperations<T> operations; 039 040 protected GenericFileProducer(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) { 041 super(endpoint); 042 this.endpoint = endpoint; 043 this.operations = operations; 044 } 045 046 @SuppressWarnings("unchecked") 047 public void process(Exchange exchange) throws Exception { 048 GenericFileExchange<T> fileExchange = (GenericFileExchange<T>) endpoint.createExchange(exchange); 049 processExchange(fileExchange); 050 ExchangeHelper.copyResults(exchange, fileExchange); 051 } 052 053 /** 054 * Perform the work to process the fileExchange 055 * 056 * @param exchange fileExchange 057 * @throws Exception is thrown if some error 058 */ 059 protected void processExchange(GenericFileExchange<T> exchange) throws Exception { 060 if (log.isTraceEnabled()) { 061 log.trace("Processing " + exchange); 062 } 063 064 try { 065 String target = createFileName(exchange); 066 067 preWriteCheck(); 068 069 // should we write to a temporary name and then afterwards rename to real target 070 boolean writeAsTempAndRename = ObjectHelper.isNotEmpty(endpoint.getTempPrefix()); 071 String tempTarget = null; 072 if (writeAsTempAndRename) { 073 // compute temporary name with the temp prefix 074 tempTarget = createTempFileName(target); 075 } 076 077 // upload the file 078 writeFile(exchange, tempTarget != null ? tempTarget : target); 079 080 // if we did write to a temporary name then rename it to the real 081 // name after we have written the file 082 if (tempTarget != null) { 083 if (log.isTraceEnabled()) { 084 log.trace("Renaming file: [" + tempTarget + "] to: [" + target + "]"); 085 } 086 boolean renamed = operations.renameFile(tempTarget, target); 087 if (!renamed) { 088 throw new GenericFileOperationFailedException("Cannot rename file from: " + tempTarget + " to: " + target); 089 } 090 } 091 092 // lets store the name we really used in the header, so end-users 093 // can retrieve it 094 exchange.getIn().setHeader(Exchange.FILE_NAME_PRODUCED, target); 095 } catch (Exception e) { 096 handleFailedWrite(exchange, e); 097 } 098 } 099 100 /** 101 * If we fail writing out a file, we will call this method. This hook is 102 * provided to disconnect from servers or clean up files we created (if needed). 103 */ 104 protected void handleFailedWrite(GenericFileExchange<T> exchange, Exception exception) throws Exception { 105 throw exception; 106 } 107 108 /** 109 * Perform any actions that need to occur before we write such as connecting to an FTP server etc. 110 */ 111 protected void preWriteCheck() throws Exception { 112 } 113 114 protected void writeFile(GenericFileExchange<T> exchange, String fileName) throws GenericFileOperationFailedException { 115 InputStream payload = exchange.getIn().getBody(InputStream.class); 116 try { 117 // build directory if auto create is enabled 118 if (endpoint.isAutoCreate()) { 119 int lastPathIndex = fileName.lastIndexOf(File.separator); 120 if (lastPathIndex != -1) { 121 String directory = fileName.substring(0, lastPathIndex); 122 // skip trailing / 123 directory = FileUtil.stripLeadingSeparator(directory); 124 if (!operations.buildDirectory(directory, false)) { 125 log.debug("Cannot build directory [" + directory + "] (could be because of denied permissions)"); 126 } 127 } 128 } 129 130 // upload 131 if (log.isTraceEnabled()) { 132 log.trace("About to write [" + fileName + "] to [" + getEndpoint() + "] from exchange [" + exchange + "]"); 133 } 134 135 boolean success = operations.storeFile(fileName, exchange); 136 if (!success) { 137 throw new GenericFileOperationFailedException("Error writing file [" + fileName + "]"); 138 } 139 if (log.isDebugEnabled()) { 140 log.debug("Wrote [" + fileName + "] to [" + getEndpoint() + "]"); 141 } 142 143 } finally { 144 ObjectHelper.close(payload, "Closing payload", log); 145 } 146 147 } 148 149 protected String createFileName(Exchange exchange) { 150 String answer; 151 152 String name = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class); 153 154 // expression support 155 Expression expression = endpoint.getFileName(); 156 if (name != null) { 157 // the header name can be an expression too, that should override 158 // whatever configured on the endpoint 159 if (name.indexOf("${") > -1) { 160 if (log.isDebugEnabled()) { 161 log.debug(Exchange.FILE_NAME + " contains a FileLanguage expression: " + name); 162 } 163 expression = FileLanguage.file(name); 164 } 165 } 166 if (expression != null) { 167 if (log.isDebugEnabled()) { 168 log.debug("Filename evaluated as expression: " + expression); 169 } 170 name = expression.evaluate(exchange, String.class); 171 } 172 173 174 // flattern name 175 if (endpoint.isFlattern()) { 176 int pos = name.lastIndexOf(File.separator); 177 if (pos == -1) { 178 pos = name.lastIndexOf('/'); 179 } 180 if (pos != -1) { 181 name = name.substring(pos + 1); 182 } 183 } 184 185 // compute path by adding endpoint starting directory 186 String endpointPath = endpoint.getConfiguration().getDirectory(); 187 // Its a directory so we should use it as a base path for the filename 188 // If the path isn't empty, we need to add a trailing / if it isn't already there 189 String baseDir = ""; 190 if (endpointPath.length() > 0) { 191 baseDir = endpointPath + (endpointPath.endsWith(File.separator) ? "" : File.separator); 192 } 193 if (name != null) { 194 answer = baseDir + name; 195 } else { 196 // use a generated filename if no name provided 197 answer = baseDir + endpoint.getGeneratedFileName(exchange.getIn()); 198 } 199 200 // must normalize path to cater for Windows and other OS 201 answer = FileUtil.normalizePath(answer); 202 203 return answer; 204 } 205 206 protected String createTempFileName(String fileName) { 207 // must normalize path to cater for Windows and other OS 208 fileName = FileUtil.normalizePath(fileName); 209 210 int path = fileName.lastIndexOf(File.separator); 211 if (path == -1) { 212 // no path 213 return endpoint.getTempPrefix() + fileName; 214 } else { 215 StringBuilder sb = new StringBuilder(fileName); 216 sb.insert(path + 1, endpoint.getTempPrefix()); 217 return sb.toString(); 218 } 219 } 220 221 }