001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file; 018 019 import java.util.ArrayList; 020 import java.util.Collections; 021 import java.util.LinkedList; 022 import java.util.List; 023 import java.util.Queue; 024 025 import org.apache.camel.BatchConsumer; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.Processor; 028 import org.apache.camel.impl.DefaultExchange; 029 import org.apache.camel.impl.ScheduledPollConsumer; 030 import org.apache.camel.util.ObjectHelper; 031 import org.apache.commons.logging.Log; 032 import org.apache.commons.logging.LogFactory; 033 034 /** 035 * Base class for remote file consumers. 036 */ 037 public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer implements BatchConsumer { 038 protected final transient Log log = LogFactory.getLog(getClass()); 039 protected GenericFileEndpoint<T> endpoint; 040 protected GenericFileOperations<T> operations; 041 protected boolean loggedIn; 042 protected String fileExpressionResult; 043 protected int maxMessagesPerPoll; 044 045 public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) { 046 super(endpoint, processor); 047 this.endpoint = endpoint; 048 this.operations = operations; 049 } 050 051 /** 052 * Poll for files 053 */ 054 protected void poll() throws Exception { 055 // must reset for each poll 056 fileExpressionResult = null; 057 058 // before we poll is there anything we need to check ? Such as are we 059 // connected to the FTP Server Still ? 060 if (!prePollCheck()) { 061 log.debug("Skipping pool as pre poll check returned false"); 062 } 063 064 // gather list of files to process 065 List<GenericFile<T>> files = new ArrayList<GenericFile<T>>(); 066 067 String name = endpoint.getConfiguration().getDirectory(); 068 pollDirectory(name, files); 069 070 // sort files using file comparator if provided 071 if (endpoint.getSorter() != null) { 072 Collections.sort(files, endpoint.getSorter()); 073 } 074 075 // sort using build in sorters that is expression based 076 // first we need to convert to RemoteFileExchange objects so we can sort 077 // using expressions 078 LinkedList<GenericFileExchange> exchanges = new LinkedList<GenericFileExchange>(); 079 for (GenericFile<T> file : files) { 080 GenericFileExchange<T> exchange = endpoint.createExchange(file); 081 endpoint.configureMessage(file, exchange.getIn()); 082 exchanges.add(exchange); 083 } 084 // sort files using exchange comparator if provided 085 if (endpoint.getSortBy() != null) { 086 Collections.sort(exchanges, endpoint.getSortBy()); 087 } 088 089 // consume files one by one 090 int total = exchanges.size(); 091 if (total > 0 && log.isDebugEnabled()) { 092 log.debug("Total " + total + " files to consume"); 093 } 094 095 processBatch(exchanges); 096 } 097 098 public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { 099 this.maxMessagesPerPoll = maxMessagesPerPoll; 100 } 101 102 @SuppressWarnings("unchecked") 103 public void processBatch(Queue exchanges) { 104 int total = exchanges.size(); 105 106 // limit if needed 107 if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) { 108 log.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as there was " + total + " messages in this poll."); 109 total = maxMessagesPerPoll; 110 } 111 112 for (int index = 0; index < total && isRunAllowed(); index++) { 113 // only loop if we are started (allowed to run) 114 // use poll to remove the head so it does not consume memory even after we have processed it 115 GenericFileExchange<T> exchange = (GenericFileExchange<T>) exchanges.poll(); 116 // add current index and total as properties 117 exchange.setProperty(Exchange.BATCH_INDEX, index); 118 exchange.setProperty(Exchange.BATCH_SIZE, total); 119 exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1); 120 121 // process the current exchange 122 processExchange(exchange); 123 } 124 125 // remove the file from the in progress list in case the batch was limited by max messages per poll 126 for (int index = 0; index < exchanges.size() && isRunAllowed(); index++) { 127 GenericFileExchange<T> exchange = (GenericFileExchange<T>) exchanges.poll(); 128 String key = exchange.getGenericFile().getFileName(); 129 endpoint.getInProgressRepository().remove(key); 130 } 131 } 132 133 /** 134 * Override if required. Perform some checks (and perhaps actions) before we 135 * poll. 136 * 137 * @return true to poll, false to skip this poll. 138 */ 139 protected boolean prePollCheck() throws Exception { 140 return true; 141 } 142 143 /** 144 * Polls the given directory for files to process 145 * 146 * @param fileName current directory or file 147 * @param fileList current list of files gathered 148 */ 149 protected abstract void pollDirectory(String fileName, List<GenericFile<T>> fileList); 150 151 /** 152 * Processes the exchange 153 * 154 * @param exchange the exchange 155 */ 156 protected void processExchange(final GenericFileExchange<T> exchange) { 157 if (log.isTraceEnabled()) { 158 log.trace("Processing remote file: " + exchange.getGenericFile()); 159 } 160 161 try { 162 final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy(); 163 164 boolean begin = processStrategy.begin(operations, endpoint, exchange, exchange.getGenericFile()); 165 if (!begin) { 166 log.warn(endpoint + " cannot begin processing file: " + exchange.getGenericFile()); 167 return; 168 } 169 170 // must use file from exchange as it can be updated due the 171 // preMoveNamePrefix/preMoveNamePostfix options 172 final GenericFile<T> target = exchange.getGenericFile(); 173 // must use full name when downloading so we have the correct path 174 final String name = target.getAbsoluteFilePath(); 175 176 // retrieve the file using the stream 177 if (log.isTraceEnabled()) { 178 log.trace("Retreiving file: " + name + " from: " + endpoint); 179 } 180 181 operations.retrieveFile(name, exchange); 182 183 if (log.isTraceEnabled()) { 184 log.trace("Retrieved file: " + name + " from: " + endpoint); 185 } 186 187 if (log.isDebugEnabled()) { 188 log.debug("About to process file: " + target + " using exchange: " + exchange); 189 } 190 191 // register on completion callback that does the completiom stategies 192 // (for instance to move the file after we have processed it) 193 exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations)); 194 195 // process the exchange 196 getProcessor().process(exchange); 197 198 } catch (Exception e) { 199 handleException(e); 200 } 201 202 } 203 204 /** 205 * Strategy for validating if the given remote file should be included or 206 * not 207 * 208 * @param file the remote file 209 * @param isDirectory wether the file is a directory or a file 210 * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it 211 */ 212 @SuppressWarnings("unchecked") 213 protected boolean isValidFile(GenericFile<T> file, boolean isDirectory) { 214 if (!isMatched(file, isDirectory)) { 215 if (log.isTraceEnabled()) { 216 log.trace("File did not match. Will skip this file: " + file); 217 } 218 return false; 219 } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getFileName())) { 220 // only use the filename as the key as the file could be moved into a done folder 221 if (log.isTraceEnabled()) { 222 log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: " + file); 223 } 224 return false; 225 } 226 227 // file matched 228 return true; 229 } 230 231 /** 232 * Strategy to perform file matching based on endpoint configuration. 233 * <p/> 234 * Will always return <tt>false</tt> for certain files/folders: 235 * <ul> 236 * <li>Starting with a dot</li> 237 * <li>lock files</li> 238 * </ul> 239 * And then <tt>true</tt> for directories. 240 * 241 * @param file the file 242 * @param isDirectory wether the file is a directory or a file 243 * @return <tt>true</tt> if the remote file is matched, <tt>false</tt> if not 244 */ 245 protected boolean isMatched(GenericFile<T> file, boolean isDirectory) { 246 String name = file.getFileNameOnly(); 247 248 // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock") 249 if (name.startsWith(".")) { 250 return false; 251 } 252 253 // lock files should be skipped 254 if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) { 255 return false; 256 } 257 258 // directories so far is always regarded as matched (matching on the name is only for files) 259 if (isDirectory) { 260 return true; 261 } 262 263 if (endpoint.getFilter() != null) { 264 if (!endpoint.getFilter().accept(file)) { 265 return false; 266 } 267 } 268 269 if (ObjectHelper.isNotEmpty(endpoint.getExclude())) { 270 if (name.matches(endpoint.getExclude())) { 271 return false; 272 } 273 } 274 275 if (ObjectHelper.isNotEmpty(endpoint.getInclude())) { 276 if (!name.matches(endpoint.getInclude())) { 277 return false; 278 } 279 } 280 281 // use file expression for a simple dynamic file filter 282 if (endpoint.getFileName() != null) { 283 evaluteFileExpression(); 284 if (fileExpressionResult != null) { 285 if (!name.equals(fileExpressionResult)) { 286 return false; 287 } 288 } 289 } 290 291 return true; 292 } 293 294 /** 295 * Is the given file already in progress. 296 * 297 * @param file the file 298 * @return <tt>true</tt> if the file is already in progress 299 */ 300 protected boolean isInProgress(GenericFile<T> file) { 301 String key = file.getFileName(); 302 return !endpoint.getInProgressRepository().add(key); 303 } 304 305 private void evaluteFileExpression() { 306 if (fileExpressionResult == null) { 307 // create a dummy exchange as Exchange is needed for expression evaluation 308 Exchange dummy = new DefaultExchange(endpoint.getCamelContext()); 309 fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class); 310 } 311 } 312 313 }