001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.processor;
019    
020    import org.apache.camel.Exchange;
021    import org.apache.camel.Processor;
022    import org.apache.camel.Message;
023    import org.apache.camel.impl.ServiceSupport;
024    import org.apache.camel.util.ServiceHelper;
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    
028    /**
029     * Implements a
030     * <a href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter Channel</a>
031     * after attempting to redeliver the message using the {@link RedeliveryPolicy}
032     *
033     * @version $Revision: 534145 $
034     */
035    public class DeadLetterChannel extends ServiceSupport implements ErrorHandler {
036        public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter";
037        public static final String REDELIVERED = "org.apache.camel.Redelivered";
038    
039        private static final transient Log log = LogFactory.getLog(DeadLetterChannel.class);
040        private Processor output;
041        private Processor deadLetter;
042        private RedeliveryPolicy redeliveryPolicy;
043        private Logger logger;
044    
045        public static <E extends Exchange> Logger createDefaultLogger() {
046            return new Logger(log, LoggingLevel.ERROR);
047        }
048    
049        public DeadLetterChannel(Processor output, Processor deadLetter) {
050            this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger());
051        }
052    
053        public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy, Logger logger) {
054            this.deadLetter = deadLetter;
055            this.output = output;
056            this.redeliveryPolicy = redeliveryPolicy;
057            this.logger = logger;
058        }
059    
060        @Override
061        public String toString() {
062            return "DeadLetterChannel[" + output + ", " + deadLetter + ", " + redeliveryPolicy + "]";
063        }
064    
065        public void process(Exchange exchange) throws Exception {
066            int redeliveryCounter = 0;
067            long redeliveryDelay = 0;
068    
069            do {
070                if (redeliveryCounter > 0) {
071                    // Figure out how long we should wait to resend this message.
072                    redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
073                    sleep(redeliveryDelay);
074                }
075    
076                try {
077                    output.process(exchange);
078                    return;
079                }
080                catch (RuntimeException e) {
081                    logger.log("On delivery attempt: " + redeliveryCounter + " caught: " + e, e);
082                }
083                redeliveryCounter = incrementRedeliveryCounter(exchange);
084            }
085            while (redeliveryPolicy.shouldRedeliver(redeliveryCounter));
086    
087            // now lets send to the dead letter queue
088            deadLetter.process(exchange);
089        }
090    
091        // Properties
092        //-------------------------------------------------------------------------
093    
094        /**
095         * Returns the output processor
096         */
097        public Processor getOutput() {
098            return output;
099        }
100    
101        /**
102         * Returns the dead letter that message exchanges will be sent to if the redelivery attempts fail
103         */
104        public Processor getDeadLetter() {
105            return deadLetter;
106        }
107    
108        public RedeliveryPolicy getRedeliveryPolicy() {
109            return redeliveryPolicy;
110        }
111    
112        /**
113         * Sets the redelivery policy
114         */
115        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
116            this.redeliveryPolicy = redeliveryPolicy;
117        }
118    
119        public Logger getLogger() {
120            return logger;
121        }
122    
123        /**
124         * Sets the logger strategy; which {@link Log} to use and which {@link LoggingLevel} to use
125         */
126        public void setLogger(Logger logger) {
127            this.logger = logger;
128        }
129    
130        // Implementation methods
131        //-------------------------------------------------------------------------
132    
133        /**
134         * Increments the redelivery counter and adds the redelivered flag if the message has been redelivered
135         */
136        protected int incrementRedeliveryCounter(Exchange exchange) {
137            Message in = exchange.getIn();
138            Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class);
139            int next = 1;
140            if (counter != null) {
141                next = counter + 1;
142            }
143            in.setHeader(REDELIVERY_COUNTER, next);
144                in.setHeader(REDELIVERED, true);
145            return next;
146        }
147    
148        protected void sleep(long redeliveryDelay) {
149            if (redeliveryDelay > 0) {
150                if (log.isDebugEnabled()) {
151                    log.debug("Sleeping for: " + redeliveryDelay + " until attempting redelivery");
152                }
153                try {
154                    Thread.sleep(redeliveryDelay);
155                }
156                catch (InterruptedException e) {
157                    if (log.isDebugEnabled()) {
158                        log.debug("Thread interupted: " + e, e);
159                    }
160                }
161            }
162        }
163    
164        protected void doStart() throws Exception {
165            ServiceHelper.startServices(output, deadLetter);
166        }
167    
168        protected void doStop() throws Exception {
169            ServiceHelper.stopServices(deadLetter, output);
170        }
171    }