001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.component.file; 019 020 import org.apache.camel.Processor; 021 import org.apache.camel.impl.ScheduledPollConsumer; 022 import org.apache.commons.logging.Log; 023 import org.apache.commons.logging.LogFactory; 024 025 import java.io.File; 026 import java.io.IOException; 027 import java.io.RandomAccessFile; 028 import java.nio.channels.FileChannel; 029 030 /** 031 * @version $Revision: 523016 $ 032 */ 033 public class FileConsumer extends ScheduledPollConsumer<FileExchange> { 034 private static final transient Log log = LogFactory.getLog(FileConsumer.class); 035 private final FileEndpoint endpoint; 036 private boolean recursive = true; 037 private boolean attemptFileLock = false; 038 private String regexPattern = ""; 039 private long lastPollTime = 0l; 040 041 public FileConsumer(final FileEndpoint endpoint, Processor processor) { 042 super(endpoint, processor); 043 this.endpoint = endpoint; 044 } 045 046 protected void poll() throws Exception { 047 pollFileOrDirectory(endpoint.getFile(), isRecursive()); 048 lastPollTime = System.currentTimeMillis(); 049 } 050 051 protected void pollFileOrDirectory(File fileOrDirectory, boolean processDir) { 052 if (!fileOrDirectory.isDirectory()) { 053 pollFile(fileOrDirectory); // process the file 054 } 055 else if (processDir) { 056 log.debug("Polling directory " + fileOrDirectory); 057 File[] files = fileOrDirectory.listFiles(); 058 for (int i = 0; i < files.length; i++) { 059 pollFileOrDirectory(files[i], isRecursive()); // self-recursion 060 } 061 } 062 else { 063 log.debug("Skipping directory " + fileOrDirectory); 064 } 065 } 066 067 protected void pollFile(final File file) { 068 if (file.exists() && file.lastModified() > lastPollTime) { 069 if (isValidFile(file)) { 070 processFile(file); 071 } 072 } 073 } 074 075 protected void processFile(File file) { 076 try { 077 getProcessor().process(endpoint.createExchange(file)); 078 } catch (Throwable e) { 079 handleException(e); 080 } 081 } 082 083 protected boolean isValidFile(File file) { 084 boolean result = false; 085 if (file != null && file.exists()) { 086 if (isMatched(file)) { 087 if (isAttemptFileLock()) { 088 FileChannel fc = null; 089 try { 090 fc = new RandomAccessFile(file, "rw").getChannel(); 091 fc.lock(); 092 result = true; 093 } 094 catch (Throwable e) { 095 log.debug("Failed to get the lock on file: " + file, e); 096 } 097 finally { 098 if (fc != null) { 099 try { 100 fc.close(); 101 } 102 catch (IOException e) { 103 } 104 } 105 } 106 } 107 else { 108 result = true; 109 } 110 } 111 } 112 return result; 113 } 114 115 protected boolean isMatched(File file) { 116 boolean result = true; 117 if (regexPattern != null && regexPattern.length() > 0) { 118 result = file.getName().matches(getRegexPattern()); 119 } 120 return result; 121 } 122 123 /** 124 * @return the recursive 125 */ 126 public boolean isRecursive() { 127 return this.recursive; 128 } 129 130 /** 131 * @param recursive the recursive to set 132 */ 133 public void setRecursive(boolean recursive) { 134 this.recursive = recursive; 135 } 136 137 /** 138 * @return the attemptFileLock 139 */ 140 public boolean isAttemptFileLock() { 141 return this.attemptFileLock; 142 } 143 144 /** 145 * @param attemptFileLock the attemptFileLock to set 146 */ 147 public void setAttemptFileLock(boolean attemptFileLock) { 148 this.attemptFileLock = attemptFileLock; 149 } 150 151 /** 152 * @return the regexPattern 153 */ 154 public String getRegexPattern() { 155 return this.regexPattern; 156 } 157 158 /** 159 * @param regexPattern the regexPattern to set 160 */ 161 public void setRegexPattern(String regexPattern) { 162 this.regexPattern = regexPattern; 163 } 164 }