001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.component.mail; 019 020 import org.apache.camel.Consumer; 021 import org.apache.camel.Processor; 022 import org.apache.camel.impl.ScheduledPollConsumer; 023 import org.apache.commons.logging.Log; 024 import org.apache.commons.logging.LogFactory; 025 026 import javax.mail.Flags; 027 import javax.mail.Folder; 028 import javax.mail.Message; 029 import javax.mail.MessagingException; 030 import javax.mail.Transport; 031 import javax.mail.event.MessageCountEvent; 032 import javax.mail.event.MessageCountListener; 033 034 /** 035 * A {@link Consumer} which consumes messages from JavaMail using a {@link Transport} and dispatches them 036 * to the {@link Processor} 037 * 038 * @version $Revision: 523430 $ 039 */ 040 public class MailConsumer extends ScheduledPollConsumer<MailExchange> implements MessageCountListener { 041 private static final transient Log log = LogFactory.getLog(MailConsumer.class); 042 private final MailEndpoint endpoint; 043 private final Folder folder; 044 045 public MailConsumer(MailEndpoint endpoint, Processor processor, Folder folder) { 046 super(endpoint, processor); 047 this.endpoint = endpoint; 048 this.folder = folder; 049 } 050 051 @Override 052 protected void doStart() throws Exception { 053 super.doStart(); 054 ensureFolderIsOpen(); 055 folder.addMessageCountListener(this); 056 } 057 058 @Override 059 protected void doStop() throws Exception { 060 folder.removeMessageCountListener(this); 061 folder.close(true); 062 super.doStop(); 063 } 064 065 public void messagesAdded(MessageCountEvent event) { 066 Message[] messages = event.getMessages(); 067 for (Message message : messages) { 068 try { 069 if (!message.getFlags().contains(Flags.Flag.DELETED)) { 070 processMessage(message); 071 072 flagMessageDeleted(message); 073 } 074 } 075 catch (MessagingException e) { 076 handleException(e); 077 } 078 } 079 } 080 081 public void messagesRemoved(MessageCountEvent event) { 082 Message[] messages = event.getMessages(); 083 for (Message message : messages) { 084 if (log.isDebugEnabled()) { 085 try { 086 log.debug("Removing message: " + message.getSubject()); 087 } 088 catch (MessagingException e) { 089 log.debug("Ignored: " + e); 090 } 091 } 092 } 093 } 094 095 protected void poll() throws Exception { 096 ensureFolderIsOpen(); 097 098 int count = folder.getMessageCount(); 099 if (count > 0) { 100 Message[] messages = folder.getMessages(); 101 MessageCountEvent event = new MessageCountEvent(folder, MessageCountEvent.ADDED, true, messages); 102 messagesAdded(event); 103 } 104 else if (count == -1) { 105 throw new MessagingException("Folder: " + folder.getFullName() + " is closed"); 106 } 107 108 folder.close(true); 109 } 110 111 protected void processMessage(Message message) { 112 try { 113 MailExchange exchange = endpoint.createExchange(message); 114 getProcessor().process(exchange); 115 } 116 catch (Throwable e) { 117 handleException(e); 118 } 119 } 120 121 protected void ensureFolderIsOpen() throws MessagingException { 122 if (!folder.isOpen()) { 123 folder.open(Folder.READ_WRITE); 124 } 125 } 126 127 protected void flagMessageDeleted(Message message) throws MessagingException { 128 if (endpoint.getConfiguration().isDeleteProcessedMessages()) { 129 message.setFlag(Flags.Flag.DELETED, true); 130 } 131 else { 132 message.setFlag(Flags.Flag.SEEN, true); 133 } 134 } 135 }