001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file.remote;
018    
019    import java.io.ByteArrayOutputStream;
020    import java.io.IOException;
021    import java.util.concurrent.ScheduledExecutorService;
022    
023    import org.apache.camel.Processor;
024    import org.apache.commons.net.ftp.FTPClient;
025    import org.apache.commons.net.ftp.FTPFile;
026    
027    public class FtpConsumer extends RemoteFileConsumer<RemoteFileExchange> {
028        private boolean recursive = true;
029        private String regexPattern = "";
030        private long lastPollTime;
031        private final FtpEndpoint endpoint;
032        private FTPClient client;
033    
034        public FtpConsumer(FtpEndpoint endpoint, Processor processor, FTPClient client) {
035            super(endpoint, processor);
036            this.endpoint = endpoint;
037            this.client = client;
038        }
039    
040        public FtpConsumer(FtpEndpoint endpoint, Processor processor, FTPClient client, ScheduledExecutorService executor) {
041            super(endpoint, processor, executor);
042            this.endpoint = endpoint;
043            this.client = client;
044        }
045    
046        protected void poll() throws Exception {
047            final String fileName = endpoint.getConfiguration().getFile();
048            if (endpoint.getConfiguration().isDirectory()) {
049                pollDirectory(fileName);
050            } else {
051                client.changeWorkingDirectory(fileName.substring(0, fileName.lastIndexOf('/')));
052                final FTPFile[] files = client.listFiles(fileName.substring(fileName.lastIndexOf('/') + 1));
053                pollFile(files[0]);
054            }
055            lastPollTime = System.currentTimeMillis();
056        }
057    
058        protected void pollDirectory(String dir) throws Exception {
059            client.changeWorkingDirectory(dir);
060            for (FTPFile ftpFile : client.listFiles()) {
061                if (ftpFile.isFile()) {
062                    pollFile(ftpFile);
063                } else if (ftpFile.isDirectory()) {
064                    if (isRecursive()) {
065                        pollDirectory(getFullFileName(ftpFile));
066                    }
067                } else {
068                    throw new RuntimeException("");
069                }
070            }
071        }
072    
073        protected String getFullFileName(FTPFile ftpFile) throws IOException {
074            return client.printWorkingDirectory() + "/" + ftpFile.getName();
075        }
076    
077        private void pollFile(FTPFile ftpFile) throws Exception {
078            if (ftpFile.getTimestamp().getTimeInMillis() > lastPollTime) { // TODO
079                                                                            // do we
080                                                                            // need
081                                                                            // to
082                                                                            // adjust
083                                                                            // the
084                                                                            // TZ?
085                                                                            // can
086                                                                            // we?
087                if (isMatched(ftpFile)) {
088                    final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
089                    client.retrieveFile(ftpFile.getName(), byteArrayOutputStream);
090                    getProcessor().process(endpoint.createExchange(getFullFileName(ftpFile), byteArrayOutputStream));
091                }
092            }
093        }
094    
095        protected boolean isMatched(FTPFile file) {
096            boolean result = true;
097            if (regexPattern != null && regexPattern.length() > 0) {
098                result = file.getName().matches(getRegexPattern());
099            }
100            return result;
101        }
102    
103        public boolean isRecursive() {
104            return recursive;
105        }
106    
107        public void setRecursive(boolean recursive) {
108            this.recursive = recursive;
109        }
110    
111        public long getLastPollTime() {
112            return lastPollTime;
113        }
114    
115        public void setLastPollTime(long lastPollTime) {
116            this.lastPollTime = lastPollTime;
117        }
118    
119        public String getRegexPattern() {
120            return regexPattern;
121        }
122    
123        public void setRegexPattern(String regexPattern) {
124            this.regexPattern = regexPattern;
125        }
126    }