001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file; 018 019 import org.apache.camel.Processor; 020 import org.apache.camel.component.file.strategy.FileProcessStrategy; 021 import org.apache.camel.impl.ScheduledPollConsumer; 022 import org.apache.commons.logging.Log; 023 import org.apache.commons.logging.LogFactory; 024 025 import java.io.File; 026 027 /** 028 * @version $Revision: 523016 $ 029 */ 030 public class FileConsumer extends ScheduledPollConsumer<FileExchange> { 031 private static final transient Log LOG = LogFactory.getLog(FileConsumer.class); 032 private final FileEndpoint endpoint; 033 private boolean recursive = true; 034 private String regexPattern = ""; 035 private long lastPollTime; 036 037 public FileConsumer(final FileEndpoint endpoint, Processor processor) { 038 super(endpoint, processor); 039 this.endpoint = endpoint; 040 } 041 042 protected void poll() throws Exception { 043 pollFileOrDirectory(endpoint.getFile(), isRecursive()); 044 lastPollTime = System.currentTimeMillis(); 045 } 046 047 protected void pollFileOrDirectory(File fileOrDirectory, boolean processDir) { 048 if (!fileOrDirectory.isDirectory()) { 049 pollFile(fileOrDirectory); // process the file 050 } 051 else if (processDir) { 052 if (isValidFile(fileOrDirectory)) { 053 LOG.debug("Polling directory " + fileOrDirectory); 054 File[] files = fileOrDirectory.listFiles(); 055 for (int i = 0; i < files.length; i++) { 056 pollFileOrDirectory(files[i], isRecursive()); // self-recursion 057 } 058 } 059 } 060 else { 061 LOG.debug("Skipping directory " + fileOrDirectory); 062 } 063 } 064 065 protected void pollFile(final File file) { 066 if (!file.exists()) { 067 return; 068 } 069 if (isValidFile(file)) { 070 // we only care about file modified times if we are not deleting/moving files 071 if (endpoint.isNoop()) { 072 long fileModified = file.lastModified(); 073 if (fileModified <= lastPollTime) { 074 if (LOG.isDebugEnabled()) { 075 LOG.debug("Ignoring file: " + file + " as modified time: " + fileModified + " less than last poll time: " + lastPollTime); 076 } 077 return; 078 } 079 } 080 081 FileProcessStrategy processStrategy = endpoint.getFileStrategy(); 082 FileExchange exchange = endpoint.createExchange(file); 083 084 try { 085 if (LOG.isDebugEnabled()) { 086 LOG.debug("About to process file: " + file + " using exchange: " + exchange); 087 } 088 if (processStrategy.begin(endpoint, exchange, file)) { 089 getProcessor().process(exchange); 090 processStrategy.commit(endpoint, exchange, file); 091 } 092 else { 093 if (LOG.isDebugEnabled()) { 094 LOG.debug(endpoint + " cannot process file: " + file); 095 } 096 } 097 } 098 catch (Throwable e) { 099 handleException(e); 100 } 101 } 102 } 103 104 protected boolean isValidFile(File file) { 105 boolean result = false; 106 if (file != null && file.exists()) { 107 if (isMatched(file)) { 108 result = true; 109 } 110 } 111 return result; 112 } 113 114 protected boolean isMatched(File file) { 115 String name = file.getName(); 116 if (regexPattern != null && regexPattern.length() > 0) { 117 if (!name.matches(getRegexPattern())) { 118 return false; 119 } 120 } 121 String[] prefixes = endpoint.getExcludedNamePrefixes(); 122 if (prefixes != null) { 123 for (String prefix : prefixes) { 124 if (name.startsWith(prefix)) { 125 return false; 126 } 127 } 128 } 129 String[] postfixes = endpoint.getExcludedNamePostfixes(); 130 if (postfixes != null) { 131 for (String postfix : postfixes) { 132 if (name.endsWith(postfix)) { 133 return false; 134 } 135 } 136 } 137 return true; 138 } 139 140 /** 141 * @return the recursive 142 */ 143 public boolean isRecursive() { 144 return this.recursive; 145 } 146 147 /** 148 * @param recursive the recursive to set 149 */ 150 public void setRecursive(boolean recursive) { 151 this.recursive = recursive; 152 } 153 154 /** 155 * @return the regexPattern 156 */ 157 public String getRegexPattern() { 158 return this.regexPattern; 159 } 160 161 /** 162 * @param regexPattern the regexPattern to set 163 */ 164 public void setRegexPattern(String regexPattern) { 165 this.regexPattern = regexPattern; 166 } 167 }