001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.component.file.remote;
019    
020    import com.jcraft.jsch.ChannelSftp;
021    import org.apache.camel.Processor;
022    
023    import java.io.ByteArrayOutputStream;
024    import java.io.IOException;
025    import java.util.concurrent.ScheduledExecutorService;
026    
027    public class SftpConsumer extends RemoteFileConsumer<RemoteFileExchange> {
028        private boolean recursive = true;
029        private String regexPattern = "";
030        private long lastPollTime = 0L;
031        private final SftpEndpoint endpoint;
032        private ChannelSftp channel;
033    
034        public SftpConsumer(SftpEndpoint endpoint, Processor processor, ChannelSftp channel) {
035            super(endpoint, processor);
036            this.endpoint = endpoint;
037            this.channel = channel;
038        }
039    
040        public SftpConsumer(SftpEndpoint endpoint, Processor processor, ChannelSftp channel, ScheduledExecutorService executor) {
041            super(endpoint, processor, executor);
042            this.endpoint = endpoint;
043            this.channel = channel;
044        }
045    
046        protected void poll() throws Exception {
047            final String fileName = endpoint.getConfiguration().getFile();
048            if (endpoint.getConfiguration().isDirectory()) {
049                pollDirectory(fileName);
050            }
051            else {
052                channel.cd(fileName.substring(0, fileName.lastIndexOf('/')));
053                final ChannelSftp.LsEntry file = (ChannelSftp.LsEntry) channel.ls(fileName.substring(fileName.lastIndexOf('/') + 1)).get(0);
054                pollFile(file);
055            }
056            lastPollTime = System.currentTimeMillis();
057        }
058    
059        protected void pollDirectory(String dir) throws Exception {
060            channel.cd(dir);
061            for (ChannelSftp.LsEntry sftpFile : (ChannelSftp.LsEntry[]) channel.ls(".").toArray(new ChannelSftp.LsEntry[]{})) {
062                if (sftpFile.getFilename().startsWith(".")) {
063                    // skip
064                }
065                else if (sftpFile.getAttrs().isDir()) {
066                    if (isRecursive()) {
067                        pollDirectory(getFullFileName(sftpFile));
068                    }
069                }
070                else {
071                    pollFile(sftpFile);
072                }
073            }
074        }
075    
076        protected String getFullFileName(ChannelSftp.LsEntry sftpFile) throws IOException {
077            return channel.pwd() + "/" + sftpFile.getFilename();
078        }
079    
080        private void pollFile(ChannelSftp.LsEntry sftpFile) throws Exception {
081            if (sftpFile.getAttrs().getMTime() * 1000 > lastPollTime) { // TODO do we need to adjust the TZ?
082                if (isMatched(sftpFile)) {
083                    final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
084                    channel.get(sftpFile.getFilename(), byteArrayOutputStream);
085                    getProcessor().process(endpoint.createExchange(getFullFileName(sftpFile), byteArrayOutputStream));
086                }
087            }
088        }
089    
090        protected boolean isMatched(ChannelSftp.LsEntry sftpFile) {
091            boolean result = true;
092            if (regexPattern != null && regexPattern.length() > 0) {
093                result = sftpFile.getFilename().matches(getRegexPattern());
094            }
095            return result;
096        }
097    
098        public boolean isRecursive() {
099            return recursive;
100        }
101    
102        public void setRecursive(boolean recursive) {
103            this.recursive = recursive;
104        }
105    
106        public long getLastPollTime() {
107            return lastPollTime;
108        }
109    
110        public void setLastPollTime(long lastPollTime) {
111            this.lastPollTime = lastPollTime;
112        }
113    
114        public String getRegexPattern() {
115            return regexPattern;
116        }
117    
118        public void setRegexPattern(String regexPattern) {
119            this.regexPattern = regexPattern;
120        }
121    }
122