001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.component.file;
019    
020    import org.apache.camel.Processor;
021    import org.apache.camel.impl.ScheduledPollConsumer;
022    import org.apache.commons.logging.Log;
023    import org.apache.commons.logging.LogFactory;
024    
025    import java.io.File;
026    import java.io.IOException;
027    import java.io.RandomAccessFile;
028    import java.nio.channels.FileChannel;
029    
030    /**
031     * @version $Revision: 523016 $
032     */
033    public class FileConsumer extends ScheduledPollConsumer<FileExchange> {
034        private static final transient Log log = LogFactory.getLog(FileConsumer.class);
035        private final FileEndpoint endpoint;
036        private boolean recursive = true;
037        private boolean attemptFileLock = false;
038        private String regexPattern = "";
039        private long lastPollTime = 0l;
040    
041        public FileConsumer(final FileEndpoint endpoint, Processor processor) {
042            super(endpoint, processor);
043            this.endpoint = endpoint;
044        }
045    
046        protected void poll() throws Exception {
047            pollFileOrDirectory(endpoint.getFile(), isRecursive());
048            lastPollTime = System.currentTimeMillis();
049        }
050    
051        protected void pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
052            if (!fileOrDirectory.isDirectory()) {
053                pollFile(fileOrDirectory); // process the file
054            }
055            else if (processDir) {
056                log.debug("Polling directory " + fileOrDirectory);
057                File[] files = fileOrDirectory.listFiles();
058                for (int i = 0; i < files.length; i++) {
059                    pollFileOrDirectory(files[i], isRecursive()); // self-recursion
060                }
061            }
062            else {
063                log.debug("Skipping directory " + fileOrDirectory);
064            }
065        }
066    
067        protected void pollFile(final File file) {
068            if (file.exists() && file.lastModified() > lastPollTime) {
069                if (isValidFile(file)) {
070                    processFile(file);
071                }
072            }
073        }
074    
075        protected void processFile(File file) {
076            try {
077                            getProcessor().process(endpoint.createExchange(file));
078                    } catch (Throwable e) {
079                            handleException(e);
080                    }
081        }
082    
083        protected boolean isValidFile(File file) {
084            boolean result = false;
085            if (file != null && file.exists()) {
086                if (isMatched(file)) {
087                    if (isAttemptFileLock()) {
088                        FileChannel fc = null;
089                        try {
090                            fc = new RandomAccessFile(file, "rw").getChannel();
091                            fc.lock();
092                            result = true;
093                        }
094                        catch (Throwable e) {
095                            log.debug("Failed to get the lock on file: " + file, e);
096                        }
097                        finally {
098                            if (fc != null) {
099                                try {
100                                    fc.close();
101                                }
102                                catch (IOException e) {
103                                }
104                            }
105                        }
106                    }
107                    else {
108                        result = true;
109                    }
110                }
111            }
112            return result;
113        }
114    
115        protected boolean isMatched(File file) {
116            boolean result = true;
117            if (regexPattern != null && regexPattern.length() > 0) {
118                result = file.getName().matches(getRegexPattern());
119            }
120            return result;
121        }
122    
123        /**
124         * @return the recursive
125         */
126        public boolean isRecursive() {
127            return this.recursive;
128        }
129    
130        /**
131         * @param recursive the recursive to set
132         */
133        public void setRecursive(boolean recursive) {
134            this.recursive = recursive;
135        }
136    
137        /**
138         * @return the attemptFileLock
139         */
140        public boolean isAttemptFileLock() {
141            return this.attemptFileLock;
142        }
143    
144        /**
145         * @param attemptFileLock the attemptFileLock to set
146         */
147        public void setAttemptFileLock(boolean attemptFileLock) {
148            this.attemptFileLock = attemptFileLock;
149        }
150    
151        /**
152         * @return the regexPattern
153         */
154        public String getRegexPattern() {
155            return this.regexPattern;
156        }
157    
158        /**
159         * @param regexPattern the regexPattern to set
160         */
161        public void setRegexPattern(String regexPattern) {
162            this.regexPattern = regexPattern;
163        }
164    }