001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.component.file;
019    
020    import java.io.File;
021    import java.io.IOException;
022    import java.io.RandomAccessFile;
023    import java.nio.channels.FileChannel;
024    import java.util.concurrent.ScheduledExecutorService;
025    import org.apache.camel.Processor;
026    import org.apache.camel.impl.PollingConsumer;
027    import org.apache.commons.logging.Log;
028    import org.apache.commons.logging.LogFactory;
029    
030    /**
031     * @version $Revision: 523016 $
032     */
033    public class FileConsumer extends PollingConsumer<FileExchange>{
034        private static final transient Log log=LogFactory.getLog(FileConsumer.class);
035    
036        private final FileEndpoint endpoint;
037        private boolean recursive=true;
038        private boolean attemptFileLock=false;
039        private String regexPattern="";
040        private long lastPollTime=0l;
041    
042        public FileConsumer(final FileEndpoint endpoint,Processor<FileExchange> processor,ScheduledExecutorService executor){
043            super(endpoint,processor,executor);
044            this.endpoint=endpoint;
045        }
046    
047        protected void poll() throws Exception{
048            pollFileOrDirectory(endpoint.getFile(),isRecursive());
049            lastPollTime=System.currentTimeMillis();
050        }
051    
052        protected void pollFileOrDirectory(File fileOrDirectory,boolean processDir){
053            if(!fileOrDirectory.isDirectory()){
054                pollFile(fileOrDirectory); // process the file
055            }else if(processDir){
056                log.debug("Polling directory "+fileOrDirectory);
057                File[] files=fileOrDirectory.listFiles();
058                for(int i=0;i<files.length;i++){
059                    pollFileOrDirectory(files[i],isRecursive()); // self-recursion
060                }
061            }else{
062                log.debug("Skipping directory "+fileOrDirectory);
063            }
064        }
065    
066        protected void pollFile(final File file){
067            if(file.exists()&&file.lastModified()>lastPollTime){
068                if(isValidFile(file)){
069                    processFile(file);
070                }
071            }
072        }
073    
074        protected void processFile(File file){
075            getProcessor().process(endpoint.createExchange(file));
076        }
077    
078        protected boolean isValidFile(File file){
079            boolean result=false;
080            if(file!=null&&file.exists()){
081                if(isMatched(file)){
082                    if(isAttemptFileLock()){
083                        FileChannel fc=null;
084                        try{
085                            fc=new RandomAccessFile(file,"rw").getChannel();
086                            fc.lock();
087                            result=true;
088                        }catch(Throwable e){
089                            log.debug("Failed to get the lock on file: " + file,e);
090                        }finally{
091                            if(fc!=null){
092                                try{
093                                    fc.close();
094                                }catch(IOException e){
095                                }
096                            }
097                        }
098                    }else{
099                        result=true;
100                    }
101                }
102            }
103            return result;
104        }
105    
106        protected boolean isMatched(File file){
107            boolean result=true;
108            if(regexPattern!=null&&regexPattern.length()>0){
109                result=file.getName().matches(getRegexPattern());
110            }
111            return result;
112        }
113    
114        /**
115         * @return the recursive
116         */
117        public boolean isRecursive(){
118            return this.recursive;
119        }
120    
121        /**
122         * @param recursive the recursive to set
123         */
124        public void setRecursive(boolean recursive){
125            this.recursive=recursive;
126        }
127    
128        /**
129         * @return the attemptFileLock
130         */
131        public boolean isAttemptFileLock(){
132            return this.attemptFileLock;
133        }
134    
135        /**
136         * @param attemptFileLock the attemptFileLock to set
137         */
138        public void setAttemptFileLock(boolean attemptFileLock){
139            this.attemptFileLock=attemptFileLock;
140        }
141    
142        /**
143         * @return the regexPattern
144         */
145        public String getRegexPattern(){
146            return this.regexPattern;
147        }
148    
149        /**
150         * @param regexPattern the regexPattern to set
151         */
152        public void setRegexPattern(String regexPattern){
153            this.regexPattern=regexPattern;
154        }
155    }