001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.mail; 018 019 import javax.mail.Flags; 020 import javax.mail.Folder; 021 import javax.mail.Message; 022 import javax.mail.MessagingException; 023 import javax.mail.event.MessageCountEvent; 024 import javax.mail.event.MessageCountListener; 025 026 import org.apache.camel.Processor; 027 import org.apache.camel.impl.ScheduledPollConsumer; 028 import org.apache.commons.logging.Log; 029 import org.apache.commons.logging.LogFactory; 030 031 /** 032 * A {@link Consumer} which consumes messages from JavaMail using a 033 * {@link Transport} and dispatches them to the {@link Processor} 034 * 035 * @version $Revision: 523430 $ 036 */ 037 public class MailConsumer extends ScheduledPollConsumer<MailExchange> implements MessageCountListener { 038 private static final transient Log LOG = LogFactory.getLog(MailConsumer.class); 039 private final MailEndpoint endpoint; 040 private final Folder folder; 041 042 public MailConsumer(MailEndpoint endpoint, Processor processor, Folder folder) { 043 super(endpoint, processor); 044 this.endpoint = endpoint; 045 this.folder = folder; 046 } 047 048 @Override 049 protected void doStart() throws Exception { 050 super.doStart(); 051 ensureFolderIsOpen(); 052 folder.addMessageCountListener(this); 053 } 054 055 @Override 056 protected void doStop() throws Exception { 057 folder.removeMessageCountListener(this); 058 folder.close(true); 059 super.doStop(); 060 } 061 062 public void messagesAdded(MessageCountEvent event) { 063 Message[] messages = event.getMessages(); 064 for (Message message : messages) { 065 try { 066 if (!message.getFlags().contains(Flags.Flag.DELETED)) { 067 processMessage(message); 068 069 flagMessageDeleted(message); 070 } 071 } catch (MessagingException e) { 072 handleException(e); 073 } 074 } 075 } 076 077 public void messagesRemoved(MessageCountEvent event) { 078 Message[] messages = event.getMessages(); 079 for (Message message : messages) { 080 if (LOG.isDebugEnabled()) { 081 try { 082 LOG.debug("Removing message: " + message.getSubject()); 083 } catch (MessagingException e) { 084 LOG.debug("Ignored: " + e); 085 } 086 } 087 } 088 } 089 090 protected void poll() throws Exception { 091 ensureFolderIsOpen(); 092 093 int count = folder.getMessageCount(); 094 if (count > 0) { 095 Message[] messages = folder.getMessages(); 096 MessageCountEvent event = new MessageCountEvent(folder, MessageCountEvent.ADDED, true, messages); 097 messagesAdded(event); 098 } else if (count == -1) { 099 throw new MessagingException("Folder: " + folder.getFullName() + " is closed"); 100 } 101 102 folder.close(true); 103 } 104 105 protected void processMessage(Message message) { 106 try { 107 MailExchange exchange = endpoint.createExchange(message); 108 getProcessor().process(exchange); 109 } catch (Throwable e) { 110 handleException(e); 111 } 112 } 113 114 protected void ensureFolderIsOpen() throws MessagingException { 115 if (!folder.isOpen()) { 116 folder.open(Folder.READ_WRITE); 117 } 118 } 119 120 protected void flagMessageDeleted(Message message) throws MessagingException { 121 if (endpoint.getConfiguration().isDeleteProcessedMessages()) { 122 message.setFlag(Flags.Flag.DELETED, true); 123 } else { 124 message.setFlag(Flags.Flag.SEEN, true); 125 } 126 } 127 }