001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file;
018    
019    import java.util.ArrayList;
020    import java.util.Collections;
021    import java.util.LinkedList;
022    import java.util.List;
023    import java.util.Queue;
024    
025    import org.apache.camel.BatchConsumer;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.Processor;
028    import org.apache.camel.impl.DefaultExchange;
029    import org.apache.camel.impl.ScheduledPollConsumer;
030    import org.apache.camel.util.ObjectHelper;
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    
034    /**
035     * Base class for remote file consumers.
036     */
037    public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer implements BatchConsumer {
038        protected final transient Log log = LogFactory.getLog(getClass());
039        protected GenericFileEndpoint<T> endpoint;
040        protected GenericFileOperations<T> operations;
041        protected boolean loggedIn;
042        protected String fileExpressionResult;
043        protected int maxMessagesPerPoll;
044    
045        public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) {
046            super(endpoint, processor);
047            this.endpoint = endpoint;
048            this.operations = operations;
049        }
050    
051        /**
052         * Poll for files
053         */
054        protected void poll() throws Exception {
055            // must reset for each poll
056            fileExpressionResult = null;
057    
058            // before we poll is there anything we need to check ? Such as are we
059            // connected to the FTP Server Still ?
060            if (!prePollCheck()) {
061                log.debug("Skipping pool as pre poll check returned false");
062            }
063    
064            // gather list of files to process
065            List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
066    
067            String name = endpoint.getConfiguration().getDirectory();
068            pollDirectory(name, files);
069    
070            // sort files using file comparator if provided
071            if (endpoint.getSorter() != null) {
072                Collections.sort(files, endpoint.getSorter());
073            }
074    
075            // sort using build in sorters that is expression based
076            // first we need to convert to RemoteFileExchange objects so we can sort
077            // using expressions
078            LinkedList<GenericFileExchange> exchanges = new LinkedList<GenericFileExchange>();
079            for (GenericFile<T> file : files) {
080                GenericFileExchange<T> exchange = endpoint.createExchange(file);
081                endpoint.configureMessage(file, exchange.getIn());
082                exchanges.add(exchange);
083            }
084            // sort files using exchange comparator if provided
085            if (endpoint.getSortBy() != null) {
086                Collections.sort(exchanges, endpoint.getSortBy());
087            }
088    
089            // consume files one by one
090            int total = exchanges.size();
091            if (total > 0 && log.isDebugEnabled()) {
092                log.debug("Total " + total + " files to consume");
093            }
094    
095            processBatch(exchanges);
096        }
097    
098        public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
099            this.maxMessagesPerPoll = maxMessagesPerPoll;
100        }
101    
102        @SuppressWarnings("unchecked")
103        public void processBatch(Queue exchanges) {
104            int total = exchanges.size();
105    
106            // limit if needed
107            if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
108                log.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as there was " + total + " messages in this poll.");
109                total = maxMessagesPerPoll;
110            }
111    
112            for (int index = 0; index < total && isRunAllowed(); index++) {
113                // only loop if we are started (allowed to run)
114                // use poll to remove the head so it does not consume memory even after we have processed it
115                GenericFileExchange<T> exchange = (GenericFileExchange<T>) exchanges.poll();
116                // add current index and total as properties
117                exchange.setProperty(Exchange.BATCH_INDEX, index);
118                exchange.setProperty(Exchange.BATCH_SIZE, total);
119                exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
120    
121                // process the current exchange
122                processExchange(exchange);
123            }
124    
125            // remove the file from the in progress list in case the batch was limited by max messages per poll
126            for (int index = 0; index < exchanges.size() && isRunAllowed(); index++) {
127                GenericFileExchange<T> exchange = (GenericFileExchange<T>) exchanges.poll();
128                String key = exchange.getGenericFile().getFileName();
129                endpoint.getInProgressRepository().remove(key);
130            }
131        }
132    
133        /**
134         * Override if required. Perform some checks (and perhaps actions) before we
135         * poll.
136         *
137         * @return true to poll, false to skip this poll.
138         */
139        protected boolean prePollCheck() throws Exception {
140            return true;
141        }
142    
143        /**
144         * Polls the given directory for files to process
145         *
146         * @param fileName current directory or file
147         * @param fileList current list of files gathered
148         */
149        protected abstract void pollDirectory(String fileName, List<GenericFile<T>> fileList);
150    
151        /**
152         * Processes the exchange
153         *
154         * @param exchange the exchange
155         */
156        protected void processExchange(final GenericFileExchange<T> exchange) {
157            if (log.isTraceEnabled()) {
158                log.trace("Processing remote file: " + exchange.getGenericFile());
159            }
160    
161            try {
162                final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
163    
164                boolean begin = processStrategy.begin(operations, endpoint, exchange, exchange.getGenericFile());
165                if (!begin) {
166                    log.warn(endpoint + " cannot begin processing file: " + exchange.getGenericFile());
167                    return;
168                }
169    
170                // must use file from exchange as it can be updated due the
171                // preMoveNamePrefix/preMoveNamePostfix options
172                final GenericFile<T> target = exchange.getGenericFile();
173                // must use full name when downloading so we have the correct path
174                final String name = target.getAbsoluteFilePath();
175    
176                // retrieve the file using the stream
177                if (log.isTraceEnabled()) {
178                    log.trace("Retreiving file: " + name + " from: " + endpoint);
179                }
180    
181                operations.retrieveFile(name, exchange);
182    
183                if (log.isTraceEnabled()) {
184                    log.trace("Retrieved file: " + name + " from: " + endpoint);
185                }
186    
187                if (log.isDebugEnabled()) {
188                    log.debug("About to process file: " + target + " using exchange: " + exchange);
189                }
190    
191                // register on completion callback that does the completiom stategies
192                // (for instance to move the file after we have processed it)
193                exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations));
194    
195                // process the exchange
196                getProcessor().process(exchange);
197    
198            } catch (Exception e) {
199                handleException(e);
200            }
201    
202        }
203    
204        /**
205         * Strategy for validating if the given remote file should be included or
206         * not
207         *
208         * @param file        the remote file
209         * @param isDirectory wether the file is a directory or a file
210         * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
211         */
212        @SuppressWarnings("unchecked")
213        protected boolean isValidFile(GenericFile<T> file, boolean isDirectory) {
214            if (!isMatched(file, isDirectory)) {
215                if (log.isTraceEnabled()) {
216                    log.trace("File did not match. Will skip this file: " + file);
217                }
218                return false;
219            } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getFileName())) {
220                // only use the filename as the key as the file could be moved into a done folder
221                if (log.isTraceEnabled()) {
222                    log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: " + file);
223                }
224                return false;
225            }
226    
227            // file matched
228            return true;
229        }
230    
231        /**
232         * Strategy to perform file matching based on endpoint configuration.
233         * <p/>
234         * Will always return <tt>false</tt> for certain files/folders:
235         * <ul>
236         * <li>Starting with a dot</li>
237         * <li>lock files</li>
238         * </ul>
239         * And then <tt>true</tt> for directories.
240         *
241         * @param file        the file
242         * @param isDirectory wether the file is a directory or a file
243         * @return <tt>true</tt> if the remote file is matched, <tt>false</tt> if not
244         */
245        protected boolean isMatched(GenericFile<T> file, boolean isDirectory) {
246            String name = file.getFileNameOnly();
247    
248            // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
249            if (name.startsWith(".")) {
250                return false;
251            }
252    
253            // lock files should be skipped
254            if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) {
255                return false;
256            }
257    
258            // directories so far is always regarded as matched (matching on the name is only for files)
259            if (isDirectory) {
260                return true;
261            }
262    
263            if (endpoint.getFilter() != null) {
264                if (!endpoint.getFilter().accept(file)) {
265                    return false;
266                }
267            }
268    
269            if (ObjectHelper.isNotEmpty(endpoint.getExclude())) {
270                if (name.matches(endpoint.getExclude())) {
271                    return false;
272                }
273            }
274    
275            if (ObjectHelper.isNotEmpty(endpoint.getInclude())) {
276                if (!name.matches(endpoint.getInclude())) {
277                    return false;
278                }
279            }
280    
281            // use file expression for a simple dynamic file filter
282            if (endpoint.getFileName() != null) {
283                evaluteFileExpression();
284                if (fileExpressionResult != null) {
285                    if (!name.equals(fileExpressionResult)) {
286                        return false;
287                    }
288                }
289            }
290    
291            return true;
292        }
293    
294        /**
295         * Is the given file already in progress.
296         *
297         * @param file the file
298         * @return <tt>true</tt> if the file is already in progress
299         */
300        protected boolean isInProgress(GenericFile<T> file) {
301            String key = file.getFileName();
302            return !endpoint.getInProgressRepository().add(key);
303        }
304    
305        private void evaluteFileExpression() {
306            if (fileExpressionResult == null) {
307                // create a dummy exchange as Exchange is needed for expression evaluation
308                Exchange dummy = new DefaultExchange(endpoint.getCamelContext());
309                fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
310            }
311        }
312    
313    }