001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file;
018    
019    import java.util.ArrayList;
020    import java.util.Collections;
021    import java.util.LinkedList;
022    import java.util.List;
023    import java.util.Queue;
024    
025    import org.apache.camel.BatchConsumer;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.Processor;
028    import org.apache.camel.impl.DefaultExchange;
029    import org.apache.camel.impl.ScheduledPollConsumer;
030    import org.apache.camel.util.ObjectHelper;
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    
034    /**
035     * Base class for remote file consumers.
036     */
037    public abstract class GenericFileConsumer<T> extends ScheduledPollConsumer implements BatchConsumer {
038        protected final transient Log log = LogFactory.getLog(getClass());
039        protected GenericFileEndpoint<T> endpoint;
040        protected GenericFileOperations<T> operations;
041        protected boolean loggedIn;
042        protected String fileExpressionResult;
043        protected int maxMessagesPerPoll;
044    
045        public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) {
046            super(endpoint, processor);
047            this.endpoint = endpoint;
048            this.operations = operations;
049        }
050    
051        /**
052         * Poll for files
053         */
054        protected void poll() throws Exception {
055            // must reset for each poll
056            fileExpressionResult = null;
057    
058            // before we poll is there anything we need to check ? Such as are we
059            // connected to the FTP Server Still ?
060            if (!prePollCheck()) {
061                log.debug("Skipping pool as pre poll check returned false");
062            }
063    
064            // gather list of files to process
065            List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
066    
067            String name = endpoint.getConfiguration().getDirectory();
068            pollDirectory(name, files);
069    
070            // sort files using file comparator if provided
071            if (endpoint.getSorter() != null) {
072                Collections.sort(files, endpoint.getSorter());
073            }
074    
075            // sort using build in sorters so we can use expressions
076            LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
077            for (GenericFile<T> file : files) {
078                Exchange exchange = endpoint.createExchange(file);
079                endpoint.configureMessage(file, exchange.getIn());
080                exchanges.add(exchange);
081            }
082            // sort files using exchange comparator if provided
083            if (endpoint.getSortBy() != null) {
084                Collections.sort(exchanges, endpoint.getSortBy());
085            }
086    
087            // consume files one by one
088            int total = exchanges.size();
089            if (total > 0 && log.isDebugEnabled()) {
090                log.debug("Total " + total + " files to consume");
091            }
092    
093            processBatch(exchanges);
094        }
095    
096        public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
097            this.maxMessagesPerPoll = maxMessagesPerPoll;
098        }
099    
100        @SuppressWarnings("unchecked")
101        public void processBatch(Queue exchanges) {
102            int total = exchanges.size();
103    
104            // limit if needed
105            if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
106                log.debug("Limiting to maximum messages to poll " + maxMessagesPerPoll + " as there was " + total + " messages in this poll.");
107                total = maxMessagesPerPoll;
108            }
109    
110            for (int index = 0; index < total && isRunAllowed(); index++) {
111                // only loop if we are started (allowed to run)
112                // use poll to remove the head so it does not consume memory even after we have processed it
113                Exchange exchange = (Exchange) exchanges.poll();
114                // add current index and total as properties
115                exchange.setProperty(Exchange.BATCH_INDEX, index);
116                exchange.setProperty(Exchange.BATCH_SIZE, total);
117                exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
118    
119                // process the current exchange
120                processExchange(exchange);
121            }
122            
123            // remove the file from the in progress list in case the batch was limited by max messages per poll
124            while (exchanges.size() > 0) {
125                Exchange exchange = (Exchange) exchanges.poll();
126                GenericFile<T> file = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
127                String key = file.getFileName();
128                endpoint.getInProgressRepository().remove(key);
129            }
130        }
131    
132        /**
133         * Override if required. Perform some checks (and perhaps actions) before we
134         * poll.
135         *
136         * @return true to poll, false to skip this poll.
137         */
138        protected boolean prePollCheck() throws Exception {
139            return true;
140        }
141    
142        /**
143         * Polls the given directory for files to process
144         *
145         * @param fileName current directory or file
146         * @param fileList current list of files gathered
147         */
148        protected abstract void pollDirectory(String fileName, List<GenericFile<T>> fileList);
149    
150        /**
151         * Processes the exchange
152         *
153         * @param exchange the exchange
154         */
155        protected void processExchange(final Exchange exchange) {
156            GenericFile<T> file = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
157            if (log.isTraceEnabled()) {
158                log.trace("Processing remote file: " + file);
159            }
160    
161            try {
162                final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
163    
164                boolean begin = processStrategy.begin(operations, endpoint, exchange, file);
165                if (!begin) {
166                    log.debug(endpoint + " cannot begin processing file: " + file);
167                    // remove file from the in progress list as its no longer in progress
168                    endpoint.getInProgressRepository().remove(file.getFileName());
169                    return;
170                }
171    
172                // must use file from exchange as it can be updated due the
173                // preMoveNamePrefix/preMoveNamePostfix options
174                final GenericFile<T> target = (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
175                // must use full name when downloading so we have the correct path
176                final String name = target.getAbsoluteFilePath();
177    
178                // retrieve the file using the stream
179                if (log.isTraceEnabled()) {
180                    log.trace("Retreiving file: " + name + " from: " + endpoint);
181                }
182    
183                operations.retrieveFile(name, exchange);
184    
185                if (log.isTraceEnabled()) {
186                    log.trace("Retrieved file: " + name + " from: " + endpoint);
187                }
188    
189                if (log.isDebugEnabled()) {
190                    log.debug("About to process file: " + target + " using exchange: " + exchange);
191                }
192    
193                // register on completion callback that does the completiom stategies
194                // (for instance to move the file after we have processed it)
195                exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations));
196    
197                // process the exchange
198                getProcessor().process(exchange);
199    
200            } catch (Exception e) {
201                handleException(e);
202            }
203        }
204    
205        /**
206         * Strategy for validating if the given remote file should be included or
207         * not
208         *
209         * @param file        the remote file
210         * @param isDirectory wether the file is a directory or a file
211         * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
212         */
213        @SuppressWarnings("unchecked")
214        protected boolean isValidFile(GenericFile<T> file, boolean isDirectory) {
215            if (!isMatched(file, isDirectory)) {
216                if (log.isTraceEnabled()) {
217                    log.trace("File did not match. Will skip this file: " + file);
218                }
219                return false;
220            } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getFileName())) {
221                // only use the filename as the key as the file could be moved into a done folder
222                if (log.isTraceEnabled()) {
223                    log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: " + file);
224                }
225                return false;
226            }
227    
228            // file matched
229            return true;
230        }
231    
232        /**
233         * Strategy to perform file matching based on endpoint configuration.
234         * <p/>
235         * Will always return <tt>false</tt> for certain files/folders:
236         * <ul>
237         * <li>Starting with a dot</li>
238         * <li>lock files</li>
239         * </ul>
240         * And then <tt>true</tt> for directories.
241         *
242         * @param file        the file
243         * @param isDirectory wether the file is a directory or a file
244         * @return <tt>true</tt> if the remote file is matched, <tt>false</tt> if not
245         */
246        protected boolean isMatched(GenericFile<T> file, boolean isDirectory) {
247            String name = file.getFileNameOnly();
248    
249            // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
250            if (name.startsWith(".")) {
251                return false;
252            }
253    
254            // lock files should be skipped
255            if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) {
256                return false;
257            }
258    
259            // directories so far is always regarded as matched (matching on the name is only for files)
260            if (isDirectory) {
261                return true;
262            }
263    
264            if (endpoint.getFilter() != null) {
265                if (!endpoint.getFilter().accept(file)) {
266                    return false;
267                }
268            }
269    
270            if (ObjectHelper.isNotEmpty(endpoint.getExclude())) {
271                if (name.matches(endpoint.getExclude())) {
272                    return false;
273                }
274            }
275    
276            if (ObjectHelper.isNotEmpty(endpoint.getInclude())) {
277                if (!name.matches(endpoint.getInclude())) {
278                    return false;
279                }
280            }
281    
282            // use file expression for a simple dynamic file filter
283            if (endpoint.getFileName() != null) {
284                evaluteFileExpression();
285                if (fileExpressionResult != null) {
286                    if (!name.equals(fileExpressionResult)) {
287                        return false;
288                    }
289                }
290            }
291    
292            return true;
293        }
294    
295        /**
296         * Is the given file already in progress.
297         *
298         * @param file the file
299         * @return <tt>true</tt> if the file is already in progress
300         */
301        protected boolean isInProgress(GenericFile<T> file) {
302            String key = file.getFileName();
303            return !endpoint.getInProgressRepository().add(key);
304        }
305    
306        private void evaluteFileExpression() {
307            if (fileExpressionResult == null) {
308                // create a dummy exchange as Exchange is needed for expression evaluation
309                Exchange dummy = new DefaultExchange(endpoint.getCamelContext());
310                fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
311            }
312        }
313    
314    }