001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.component.mail;
019    
020    import org.apache.camel.Consumer;
021    import org.apache.camel.Processor;
022    import org.apache.camel.impl.ScheduledPollConsumer;
023    import org.apache.commons.logging.Log;
024    import org.apache.commons.logging.LogFactory;
025    
026    import javax.mail.Flags;
027    import javax.mail.Folder;
028    import javax.mail.Message;
029    import javax.mail.MessagingException;
030    import javax.mail.Transport;
031    import javax.mail.event.MessageCountEvent;
032    import javax.mail.event.MessageCountListener;
033    
034    /**
035     * A {@link Consumer} which consumes messages from JavaMail using a {@link Transport} and dispatches them
036     * to the {@link Processor}
037     *
038     * @version $Revision: 523430 $
039     */
040    public class MailConsumer extends ScheduledPollConsumer<MailExchange> implements MessageCountListener {
041        private static final transient Log log = LogFactory.getLog(MailConsumer.class);
042        private final MailEndpoint endpoint;
043        private final Folder folder;
044    
045        public MailConsumer(MailEndpoint endpoint, Processor processor, Folder folder) {
046            super(endpoint, processor);
047            this.endpoint = endpoint;
048            this.folder = folder;
049        }
050    
051        @Override
052        protected void doStart() throws Exception {
053            super.doStart();
054            ensureFolderIsOpen();
055            folder.addMessageCountListener(this);
056        }
057    
058        @Override
059        protected void doStop() throws Exception {
060            folder.removeMessageCountListener(this);
061            folder.close(true);
062            super.doStop();
063        }
064    
065        public void messagesAdded(MessageCountEvent event) {
066            Message[] messages = event.getMessages();
067            for (Message message : messages) {
068                try {
069                    if (!message.getFlags().contains(Flags.Flag.DELETED)) {
070                        processMessage(message);
071    
072                        flagMessageDeleted(message);
073                    }
074                }
075                catch (MessagingException e) {
076                    handleException(e);
077                }
078            }
079        }
080    
081        public void messagesRemoved(MessageCountEvent event) {
082            Message[] messages = event.getMessages();
083            for (Message message : messages) {
084                if (log.isDebugEnabled()) {
085                    try {
086                        log.debug("Removing message: " + message.getSubject());
087                    }
088                    catch (MessagingException e) {
089                        log.debug("Ignored: " + e);
090                    }
091                }
092            }
093        }
094    
095        protected void poll() throws Exception {
096            ensureFolderIsOpen();
097    
098            int count = folder.getMessageCount();
099            if (count > 0) {
100                Message[] messages = folder.getMessages();
101                MessageCountEvent event = new MessageCountEvent(folder, MessageCountEvent.ADDED, true, messages);
102                messagesAdded(event);
103            }
104            else if (count == -1) {
105                throw new MessagingException("Folder: " + folder.getFullName() + " is closed");
106            }
107    
108            folder.close(true);
109        }
110    
111        protected void processMessage(Message message) {
112            try {
113                MailExchange exchange = endpoint.createExchange(message);
114                getProcessor().process(exchange);
115            }
116            catch (Throwable e) {
117                handleException(e);
118            }
119        }
120    
121        protected void ensureFolderIsOpen() throws MessagingException {
122            if (!folder.isOpen()) {
123                folder.open(Folder.READ_WRITE);
124            }
125        }
126    
127        protected void flagMessageDeleted(Message message) throws MessagingException {
128            if (endpoint.getConfiguration().isDeleteProcessedMessages()) {
129                message.setFlag(Flags.Flag.DELETED, true);
130            }
131            else {
132                message.setFlag(Flags.Flag.SEEN, true);
133            }
134        }
135    }