001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file; 018 019 import java.util.ArrayList; 020 import java.util.Collections; 021 import java.util.List; 022 023 import org.apache.camel.AsyncCallback; 024 import org.apache.camel.Exchange; 025 import org.apache.camel.Processor; 026 import org.apache.camel.impl.DefaultExchange; 027 import org.apache.camel.impl.ScheduledPollConsumer; 028 import org.apache.camel.processor.DeadLetterChannel; 029 import org.apache.camel.util.ObjectHelper; 030 import org.apache.commons.logging.Log; 031 import org.apache.commons.logging.LogFactory; 032 033 /** 034 * Base class for remote file consumers. 035 */ 036 public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer { 037 protected final transient Log log = LogFactory.getLog(getClass()); 038 protected GenericFileEndpoint<T> endpoint; 039 protected GenericFileOperations<T> operations; 040 protected boolean loggedIn; 041 protected String fileExpressionResult; 042 043 public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) { 044 super(endpoint, processor); 045 this.endpoint = endpoint; 046 this.operations = operations; 047 } 048 049 /** 050 * Poll for files 051 */ 052 protected void poll() throws Exception { 053 // must reset for each poll 054 fileExpressionResult = null; 055 056 // before we poll is there anything we need to check ? Such as are we 057 // connected to the FTP Server Still ? 058 if (!prePollCheck()) { 059 log.debug("Skipping pool as pre poll check returned false"); 060 } 061 062 // gather list of files to process 063 List<GenericFile<T>> files = new ArrayList<GenericFile<T>>(); 064 065 String name = endpoint.getConfiguration().getDirectory(); 066 pollDirectory(name, files); 067 068 // sort files using file comparator if provided 069 if (endpoint.getSorter() != null) { 070 Collections.sort(files, endpoint.getSorter()); 071 } 072 073 // sort using build in sorters that is expression based 074 // first we need to convert to RemoteFileExchange objects so we can sort 075 // using expressions 076 List<GenericFileExchange<T>> exchanges = new ArrayList<GenericFileExchange<T>>(files.size()); 077 for (GenericFile<T> file : files) { 078 GenericFileExchange<T> exchange = endpoint.createExchange(file); 079 endpoint.configureMessage(file, exchange.getIn()); 080 exchanges.add(exchange); 081 } 082 // sort files using exchange comparator if provided 083 if (endpoint.getSortBy() != null) { 084 Collections.sort(exchanges, endpoint.getSortBy()); 085 } 086 087 // consume files one by one 088 int total = exchanges.size(); 089 if (total > 0 && log.isDebugEnabled()) { 090 log.debug("Total " + total + " files to consume"); 091 } 092 for (int index = 0; index < total && isRunAllowed(); index++) { 093 // only loop if we are started (allowed to run) 094 GenericFileExchange<T> exchange = exchanges.get(index); 095 // add current index and total as headers 096 exchange.getIn().setHeader(Exchange.FILE_BATCH_INDEX, index); 097 exchange.getIn().setHeader(Exchange.FILE_BATCH_SIZE, total); 098 processExchange(exchange); 099 } 100 } 101 102 /** 103 * Override if required. Perform some checks (and perhaps actions) before we 104 * poll. 105 * 106 * @return true to poll, false to skip this poll. 107 */ 108 protected boolean prePollCheck() throws Exception { 109 return true; 110 } 111 112 /** 113 * Polls the given directory for files to process 114 * 115 * @param fileName current directory or file 116 * @param fileList current list of files gathered 117 */ 118 protected abstract void pollDirectory(String fileName, List<GenericFile<T>> fileList); 119 120 /** 121 * Processes the exchange 122 * 123 * @param exchange the exchange 124 */ 125 protected void processExchange(final GenericFileExchange<T> exchange) { 126 if (log.isTraceEnabled()) { 127 log.trace("Processing remote file: " + exchange.getGenericFile()); 128 } 129 130 try { 131 final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy(); 132 133 if (processStrategy.begin(operations, endpoint, exchange, exchange.getGenericFile())) { 134 135 // must use file from exchange as it can be updated due the 136 // preMoveNamePrefix/preMoveNamePostfix options 137 final GenericFile<T> target = exchange.getGenericFile(); 138 // must use full name when downloading so we have the correct path 139 final String name = target.getAbsoluteFilePath(); 140 141 // retrieve the file using the stream 142 if (log.isTraceEnabled()) { 143 log.trace("Retreiving file: " + name + " from: " + endpoint); 144 } 145 146 operations.retrieveFile(name, exchange); 147 148 if (log.isTraceEnabled()) { 149 log.trace("Retrieved file: " + name + " from: " + endpoint); 150 } 151 152 if (log.isDebugEnabled()) { 153 log.debug("About to process file: " + target + " using exchange: " + exchange); 154 } 155 // Use the async processor interface so that processing of 156 // the exchange can happen asynchronously 157 getAsyncProcessor().process(exchange, new AsyncCallback() { 158 public void done(boolean sync) { 159 final GenericFile<T> file = exchange.getGenericFile(); 160 boolean failed = exchange.isFailed(); 161 162 if (log.isDebugEnabled()) { 163 log.debug("Done processing file: " + file + " using exchange: " + exchange); 164 } 165 166 boolean committed = false; 167 try { 168 if (!failed) { 169 // commit the file strategy if there was no failure or already handled by the DeadLetterChannel 170 processStrategyCommit(processStrategy, exchange, file); 171 committed = true; 172 } else { 173 // there was an exception but it was not handled by the DeadLetterChannel 174 handleException(exchange.getException()); 175 } 176 } finally { 177 if (!committed) { 178 processStrategyRollback(processStrategy, exchange, file); 179 } 180 } 181 } 182 }); 183 } else { 184 log.warn(endpoint + " cannot process remote file: " + exchange.getGenericFile()); 185 } 186 } catch (Exception e) { 187 handleException(e); 188 } 189 190 } 191 192 /** 193 * Strategy when the file was processed and a commit should be executed. 194 * 195 * @param processStrategy the strategy to perform the commit 196 * @param exchange the exchange 197 * @param file the file processed 198 */ 199 @SuppressWarnings("unchecked") 200 protected void processStrategyCommit(GenericFileProcessStrategy<T> processStrategy, 201 GenericFileExchange<T> exchange, GenericFile<T> file) { 202 if (endpoint.isIdempotent()) { 203 // only add to idempotent repository if we could process the file 204 // only use the filename as the key as the file could be moved into a done folder 205 endpoint.getIdempotentRepository().add(file.getFileName()); 206 } 207 208 try { 209 if (log.isTraceEnabled()) { 210 log.trace("Committing remote file strategy: " + processStrategy + " for file: " + file); 211 } 212 processStrategy.commit(operations, endpoint, exchange, file); 213 } catch (Exception e) { 214 handleException(e); 215 } 216 } 217 218 /** 219 * Strategy when the file was not processed and a rollback should be 220 * executed. 221 * 222 * @param processStrategy the strategy to perform the commit 223 * @param exchange the exchange 224 * @param file the file processed 225 */ 226 protected void processStrategyRollback(GenericFileProcessStrategy<T> processStrategy, 227 GenericFileExchange<T> exchange, GenericFile<T> file) { 228 if (log.isWarnEnabled()) { 229 log.warn("Rolling back remote file strategy: " + processStrategy + " for file: " + file); 230 } 231 try { 232 processStrategy.rollback(operations, endpoint, exchange, file); 233 } catch (Exception e) { 234 handleException(e); 235 } 236 } 237 238 /** 239 * Strategy for validating if the given remote file should be included or 240 * not 241 * 242 * @param file the remote file 243 * @param isDirectory wether the file is a directory or a file 244 * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it 245 */ 246 @SuppressWarnings("unchecked") 247 protected boolean isValidFile(GenericFile<T> file, boolean isDirectory) { 248 if (!isMatched(file, isDirectory)) { 249 if (log.isTraceEnabled()) { 250 log.trace("Remote file did not match. Will skip this remote file: " + file); 251 } 252 return false; 253 } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getFileName())) { 254 // only use the filename as the key as the file could be moved into a done folder 255 if (log.isTraceEnabled()) { 256 log.trace("RemoteFileConsumer is idempotent and the file has been consumed before. Will skip this remote file: " + file); 257 } 258 return false; 259 } 260 261 // file matched 262 return true; 263 } 264 265 /** 266 * Strategy to perform file matching based on endpoint configuration. 267 * <p/> 268 * Will always return <tt>false</tt> for certain files/folders: 269 * <ul> 270 * <li>Starting with a dot</li> 271 * <li>lock files</li> 272 * </ul> 273 * And then <tt>true</tt> for directories. 274 * 275 * @param file the remote file 276 * @param isDirectory wether the file is a directory or a file 277 * @return <tt>true</tt> if the remote file is matched, <tt>false</tt> if not 278 */ 279 protected boolean isMatched(GenericFile<T> file, boolean isDirectory) { 280 String name = file.getFileNameOnly(); 281 282 // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock") 283 if (name.startsWith(".")) { 284 return false; 285 } 286 287 // lock files should be skipped 288 if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) { 289 return false; 290 } 291 292 // directories so far is always regarded as matched (matching on the name is only for files) 293 if (isDirectory) { 294 return true; 295 } 296 297 if (endpoint.getFilter() != null) { 298 if (!endpoint.getFilter().accept(file)) { 299 return false; 300 } 301 } 302 303 if (ObjectHelper.isNotEmpty(endpoint.getExclude())) { 304 if (name.matches(endpoint.getExclude())) { 305 return false; 306 } 307 } 308 309 if (ObjectHelper.isNotEmpty(endpoint.getInclude())) { 310 if (!name.matches(endpoint.getInclude())) { 311 return false; 312 } 313 } 314 315 // use file expression for a simple dynamic file filter 316 if (endpoint.getFileName() != null) { 317 evaluteFileExpression(); 318 if (fileExpressionResult != null) { 319 if (!name.equals(fileExpressionResult)) { 320 return false; 321 } 322 } 323 } 324 325 return true; 326 } 327 328 private void evaluteFileExpression() { 329 if (fileExpressionResult == null) { 330 // create a dummy exchange as Exchange is needed for expression evaluation 331 Exchange dummy = new DefaultExchange(endpoint.getCamelContext()); 332 fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class); 333 } 334 } 335 336 337 }