001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.component.file; 019 020 import java.io.File; 021 import java.io.IOException; 022 import java.io.RandomAccessFile; 023 import java.nio.channels.FileChannel; 024 import java.util.concurrent.ScheduledExecutorService; 025 import org.apache.camel.Processor; 026 import org.apache.camel.impl.PollingConsumer; 027 import org.apache.commons.logging.Log; 028 import org.apache.commons.logging.LogFactory; 029 030 /** 031 * @version $Revision: 523016 $ 032 */ 033 public class FileConsumer extends PollingConsumer<FileExchange>{ 034 private static final transient Log log=LogFactory.getLog(FileConsumer.class); 035 036 private final FileEndpoint endpoint; 037 private boolean recursive=true; 038 private boolean attemptFileLock=false; 039 private String regexPattern=""; 040 private long lastPollTime=0l; 041 042 public FileConsumer(final FileEndpoint endpoint,Processor<FileExchange> processor,ScheduledExecutorService executor){ 043 super(endpoint,processor,executor); 044 this.endpoint=endpoint; 045 } 046 047 protected void poll() throws Exception{ 048 pollFileOrDirectory(endpoint.getFile(),isRecursive()); 049 lastPollTime=System.currentTimeMillis(); 050 } 051 052 protected void pollFileOrDirectory(File fileOrDirectory,boolean processDir){ 053 if(!fileOrDirectory.isDirectory()){ 054 pollFile(fileOrDirectory); // process the file 055 }else if(processDir){ 056 log.debug("Polling directory "+fileOrDirectory); 057 File[] files=fileOrDirectory.listFiles(); 058 for(int i=0;i<files.length;i++){ 059 pollFileOrDirectory(files[i],isRecursive()); // self-recursion 060 } 061 }else{ 062 log.debug("Skipping directory "+fileOrDirectory); 063 } 064 } 065 066 protected void pollFile(final File file){ 067 if(file.exists()&&file.lastModified()>lastPollTime){ 068 if(isValidFile(file)){ 069 processFile(file); 070 } 071 } 072 } 073 074 protected void processFile(File file){ 075 getProcessor().process(endpoint.createExchange(file)); 076 } 077 078 protected boolean isValidFile(File file){ 079 boolean result=false; 080 if(file!=null&&file.exists()){ 081 if(isMatched(file)){ 082 if(isAttemptFileLock()){ 083 FileChannel fc=null; 084 try{ 085 fc=new RandomAccessFile(file,"rw").getChannel(); 086 fc.lock(); 087 result=true; 088 }catch(Throwable e){ 089 log.debug("Failed to get the lock on file: " + file,e); 090 }finally{ 091 if(fc!=null){ 092 try{ 093 fc.close(); 094 }catch(IOException e){ 095 } 096 } 097 } 098 }else{ 099 result=true; 100 } 101 } 102 } 103 return result; 104 } 105 106 protected boolean isMatched(File file){ 107 boolean result=true; 108 if(regexPattern!=null&®exPattern.length()>0){ 109 result=file.getName().matches(getRegexPattern()); 110 } 111 return result; 112 } 113 114 /** 115 * @return the recursive 116 */ 117 public boolean isRecursive(){ 118 return this.recursive; 119 } 120 121 /** 122 * @param recursive the recursive to set 123 */ 124 public void setRecursive(boolean recursive){ 125 this.recursive=recursive; 126 } 127 128 /** 129 * @return the attemptFileLock 130 */ 131 public boolean isAttemptFileLock(){ 132 return this.attemptFileLock; 133 } 134 135 /** 136 * @param attemptFileLock the attemptFileLock to set 137 */ 138 public void setAttemptFileLock(boolean attemptFileLock){ 139 this.attemptFileLock=attemptFileLock; 140 } 141 142 /** 143 * @return the regexPattern 144 */ 145 public String getRegexPattern(){ 146 return this.regexPattern; 147 } 148 149 /** 150 * @param regexPattern the regexPattern to set 151 */ 152 public void setRegexPattern(String regexPattern){ 153 this.regexPattern=regexPattern; 154 } 155 }