001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file.remote;
018    
019    import java.io.ByteArrayOutputStream;
020    import java.io.IOException;
021    import java.util.concurrent.ScheduledExecutorService;
022    
023    import com.jcraft.jsch.ChannelSftp;
024    
025    import org.apache.camel.Processor;
026    
027    public class SftpConsumer extends RemoteFileConsumer<RemoteFileExchange> {
028        private boolean recursive = true;
029        private String regexPattern = "";
030        private long lastPollTime;
031        private final SftpEndpoint endpoint;
032        private ChannelSftp channel;
033    
034        public SftpConsumer(SftpEndpoint endpoint, Processor processor, ChannelSftp channel) {
035            super(endpoint, processor);
036            this.endpoint = endpoint;
037            this.channel = channel;
038        }
039    
040        public SftpConsumer(SftpEndpoint endpoint, Processor processor, ChannelSftp channel, ScheduledExecutorService executor) {
041            super(endpoint, processor, executor);
042            this.endpoint = endpoint;
043            this.channel = channel;
044        }
045    
046        protected void poll() throws Exception {
047            final String fileName = endpoint.getConfiguration().getFile();
048            if (endpoint.getConfiguration().isDirectory()) {
049                pollDirectory(fileName);
050            } else {
051                channel.cd(fileName.substring(0, fileName.lastIndexOf('/')));
052                final ChannelSftp.LsEntry file = (ChannelSftp.LsEntry)channel.ls(fileName.substring(fileName.lastIndexOf('/') + 1)).get(0);
053                pollFile(file);
054            }
055            lastPollTime = System.currentTimeMillis();
056        }
057    
058        protected void pollDirectory(String dir) throws Exception {
059            channel.cd(dir);
060            for (ChannelSftp.LsEntry sftpFile : (ChannelSftp.LsEntry[])channel.ls(".").toArray(new ChannelSftp.LsEntry[] {})) {
061                if (sftpFile.getFilename().startsWith(".")) {
062                    // skip
063                } else if (sftpFile.getAttrs().isDir()) {
064                    if (isRecursive()) {
065                        pollDirectory(getFullFileName(sftpFile));
066                    }
067                } else {
068                    pollFile(sftpFile);
069                }
070            }
071        }
072    
073        protected String getFullFileName(ChannelSftp.LsEntry sftpFile) throws IOException {
074            return channel.pwd() + "/" + sftpFile.getFilename();
075        }
076    
077        private void pollFile(ChannelSftp.LsEntry sftpFile) throws Exception {
078            if (sftpFile.getAttrs().getMTime() * 1000 > lastPollTime) { // TODO do
079                                                                        // we need
080                                                                        // to adjust
081                                                                        // the TZ?
082                if (isMatched(sftpFile)) {
083                    final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
084                    channel.get(sftpFile.getFilename(), byteArrayOutputStream);
085                    getProcessor().process(endpoint.createExchange(getFullFileName(sftpFile), byteArrayOutputStream));
086                }
087            }
088        }
089    
090        protected boolean isMatched(ChannelSftp.LsEntry sftpFile) {
091            boolean result = true;
092            if (regexPattern != null && regexPattern.length() > 0) {
093                result = sftpFile.getFilename().matches(getRegexPattern());
094            }
095            return result;
096        }
097    
098        public boolean isRecursive() {
099            return recursive;
100        }
101    
102        public void setRecursive(boolean recursive) {
103            this.recursive = recursive;
104        }
105    
106        public long getLastPollTime() {
107            return lastPollTime;
108        }
109    
110        public void setLastPollTime(long lastPollTime) {
111            this.lastPollTime = lastPollTime;
112        }
113    
114        public String getRegexPattern() {
115            return regexPattern;
116        }
117    
118        public void setRegexPattern(String regexPattern) {
119            this.regexPattern = regexPattern;
120        }
121    }