001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.mail;
018    
019    import javax.mail.Flags;
020    import javax.mail.Folder;
021    import javax.mail.Message;
022    import javax.mail.MessagingException;
023    import javax.mail.event.MessageCountEvent;
024    import javax.mail.event.MessageCountListener;
025    
026    import org.apache.camel.Processor;
027    import org.apache.camel.impl.ScheduledPollConsumer;
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    
031    /**
032     * A {@link Consumer} which consumes messages from JavaMail using a
033     * {@link Transport} and dispatches them to the {@link Processor}
034     * 
035     * @version $Revision: 523430 $
036     */
037    public class MailConsumer extends ScheduledPollConsumer<MailExchange> implements MessageCountListener {
038        private static final transient Log LOG = LogFactory.getLog(MailConsumer.class);
039        private final MailEndpoint endpoint;
040        private final Folder folder;
041    
042        public MailConsumer(MailEndpoint endpoint, Processor processor, Folder folder) {
043            super(endpoint, processor);
044            this.endpoint = endpoint;
045            this.folder = folder;
046        }
047    
048        @Override
049        protected void doStart() throws Exception {
050            super.doStart();
051            ensureFolderIsOpen();
052            folder.addMessageCountListener(this);
053        }
054    
055        @Override
056        protected void doStop() throws Exception {
057            folder.removeMessageCountListener(this);
058            folder.close(true);
059            super.doStop();
060        }
061    
062        public void messagesAdded(MessageCountEvent event) {
063            Message[] messages = event.getMessages();
064            for (Message message : messages) {
065                try {
066                    if (!message.getFlags().contains(Flags.Flag.DELETED)) {
067                        processMessage(message);
068    
069                        flagMessageDeleted(message);
070                    }
071                } catch (MessagingException e) {
072                    handleException(e);
073                }
074            }
075        }
076    
077        public void messagesRemoved(MessageCountEvent event) {
078            Message[] messages = event.getMessages();
079            for (Message message : messages) {
080                if (LOG.isDebugEnabled()) {
081                    try {
082                        LOG.debug("Removing message: " + message.getSubject());
083                    } catch (MessagingException e) {
084                        LOG.debug("Ignored: " + e);
085                    }
086                }
087            }
088        }
089    
090        protected void poll() throws Exception {
091            ensureFolderIsOpen();
092    
093            int count = folder.getMessageCount();
094            if (count > 0) {
095                Message[] messages = folder.getMessages();
096                MessageCountEvent event = new MessageCountEvent(folder, MessageCountEvent.ADDED, true, messages);
097                messagesAdded(event);
098            } else if (count == -1) {
099                throw new MessagingException("Folder: " + folder.getFullName() + " is closed");
100            }
101    
102            folder.close(true);
103        }
104    
105        protected void processMessage(Message message) {
106            try {
107                MailExchange exchange = endpoint.createExchange(message);
108                getProcessor().process(exchange);
109            } catch (Throwable e) {
110                handleException(e);
111            }
112        }
113    
114        protected void ensureFolderIsOpen() throws MessagingException {
115            if (!folder.isOpen()) {
116                folder.open(Folder.READ_WRITE);
117            }
118        }
119    
120        protected void flagMessageDeleted(Message message) throws MessagingException {
121            if (endpoint.getConfiguration().isDeleteProcessedMessages()) {
122                message.setFlag(Flags.Flag.DELETED, true);
123            } else {
124                message.setFlag(Flags.Flag.SEEN, true);
125            }
126        }
127    }