001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file;
018    
019    import org.apache.camel.Processor;
020    import org.apache.camel.component.file.strategy.FileProcessStrategy;
021    import org.apache.camel.impl.ScheduledPollConsumer;
022    import org.apache.commons.logging.Log;
023    import org.apache.commons.logging.LogFactory;
024    
025    import java.io.File;
026    
027    /**
028     * @version $Revision: 523016 $
029     */
030    public class FileConsumer extends ScheduledPollConsumer<FileExchange> {
031        private static final transient Log LOG = LogFactory.getLog(FileConsumer.class);
032        private final FileEndpoint endpoint;
033        private boolean recursive = true;
034        private String regexPattern = "";
035        private long lastPollTime;
036    
037        public FileConsumer(final FileEndpoint endpoint, Processor processor) {
038            super(endpoint, processor);
039            this.endpoint = endpoint;
040        }
041    
042        protected void poll() throws Exception {
043            pollFileOrDirectory(endpoint.getFile(), isRecursive());
044            lastPollTime = System.currentTimeMillis();
045        }
046    
047        protected void pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
048            if (!fileOrDirectory.isDirectory()) {
049                pollFile(fileOrDirectory); // process the file
050            }
051            else if (processDir) {
052                if (isValidFile(fileOrDirectory)) {
053                    LOG.debug("Polling directory " + fileOrDirectory);
054                    File[] files = fileOrDirectory.listFiles();
055                    for (int i = 0; i < files.length; i++) {
056                        pollFileOrDirectory(files[i], isRecursive()); // self-recursion
057                    }
058                }
059            }
060            else {
061                LOG.debug("Skipping directory " + fileOrDirectory);
062            }
063        }
064    
065        protected void pollFile(final File file) {
066            if (!file.exists()) {
067                return;
068            }
069            if (isValidFile(file)) {
070                // we only care about file modified times if we are not deleting/moving files
071                if (endpoint.isNoop()) {
072                    long fileModified = file.lastModified();
073                    if (fileModified <= lastPollTime) {
074                        if (LOG.isDebugEnabled()) {
075                            LOG.debug("Ignoring file: " + file + " as modified time: " + fileModified + " less than last poll time: " + lastPollTime);
076                        }
077                        return;
078                    }
079                }
080    
081                FileProcessStrategy processStrategy = endpoint.getFileStrategy();
082                FileExchange exchange = endpoint.createExchange(file);
083    
084                try {
085                    if (LOG.isDebugEnabled()) {
086                        LOG.debug("About to process file:  " + file + " using exchange: " + exchange);
087                    }
088                    if (processStrategy.begin(endpoint, exchange, file)) {
089                        getProcessor().process(exchange);
090                        processStrategy.commit(endpoint, exchange, file);
091                    }
092                    else {
093                        if (LOG.isDebugEnabled()) {
094                            LOG.debug(endpoint + " cannot process file: " + file);
095                        }
096                    }
097                }
098                catch (Throwable e) {
099                    handleException(e);
100                }
101            }
102        }
103    
104        protected boolean isValidFile(File file) {
105            boolean result = false;
106            if (file != null && file.exists()) {
107                if (isMatched(file)) {
108                    result = true;
109                }
110            }
111            return result;
112        }
113    
114        protected boolean isMatched(File file) {
115            String name = file.getName();
116            if (regexPattern != null && regexPattern.length() > 0) {
117                if (!name.matches(getRegexPattern())) {
118                    return false;
119                }
120            }
121            String[] prefixes = endpoint.getExcludedNamePrefixes();
122            if (prefixes != null) {
123                for (String prefix : prefixes) {
124                    if (name.startsWith(prefix)) {
125                        return false;
126                    }
127                }
128            }
129            String[] postfixes = endpoint.getExcludedNamePostfixes();
130            if (postfixes != null) {
131                for (String postfix : postfixes) {
132                    if (name.endsWith(postfix)) {
133                        return false;
134                    }
135                }
136            }
137            return true;
138        }
139    
140        /**
141         * @return the recursive
142         */
143        public boolean isRecursive() {
144            return this.recursive;
145        }
146    
147        /**
148         * @param recursive the recursive to set
149         */
150        public void setRecursive(boolean recursive) {
151            this.recursive = recursive;
152        }
153    
154        /**
155         * @return the regexPattern
156         */
157        public String getRegexPattern() {
158            return this.regexPattern;
159        }
160    
161        /**
162         * @param regexPattern the regexPattern to set
163         */
164        public void setRegexPattern(String regexPattern) {
165            this.regexPattern = regexPattern;
166        }
167    }