001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import org.apache.camel.Exchange;
020    import org.apache.camel.Message;
021    import org.apache.camel.Processor;
022    import org.apache.camel.model.ExceptionType;
023    import org.apache.camel.impl.ServiceSupport;
024    import org.apache.camel.util.ServiceHelper;
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    
028    /**
029     * Implements a <a
030     * href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter
031     * Channel</a> after attempting to redeliver the message using the
032     * {@link RedeliveryPolicy}
033     * 
034     * @version $Revision: 564644 $
035     */
036    public class DeadLetterChannel extends ErrorHandlerSupport {
037        public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter";
038        public static final String REDELIVERED = "org.apache.camel.Redelivered";
039    
040        private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
041        private Processor output;
042        private Processor deadLetter;
043        private RedeliveryPolicy redeliveryPolicy;
044        private Logger logger;
045    
046        public DeadLetterChannel(Processor output, Processor deadLetter) {
047            this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger());
048        }
049    
050        public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy,
051                                 Logger logger) {
052            this.deadLetter = deadLetter;
053            this.output = output;
054            this.redeliveryPolicy = redeliveryPolicy;
055            this.logger = logger;
056        }
057        
058        public static <E extends Exchange> Logger createDefaultLogger() {
059            return new Logger(LOG, LoggingLevel.ERROR);
060        }
061    
062        @Override
063        public String toString() {
064            return "DeadLetterChannel[" + output + ", " + deadLetter + ", " + redeliveryPolicy + "]";
065        }
066    
067        public void process(Exchange exchange) throws Exception {
068            int redeliveryCounter = 0;
069            long redeliveryDelay = 0;
070    
071            // default behaviour which can be overloaded on a per exception basis
072            RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
073            Processor failureProcessor = deadLetter;
074    
075            do {
076                if (redeliveryCounter > 0) {
077                    // Figure out how long we should wait to resend this message.
078                    redeliveryDelay = currentRedeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
079                    sleep(redeliveryDelay);
080                }
081    
082                try {
083                    output.process(exchange);
084                    return;
085                } catch (Throwable e) {
086                    logger.log("On delivery attempt: " + redeliveryCounter + " caught: " + e, e);
087                    redeliveryCounter = incrementRedeliveryCounter(exchange, e);
088    
089    
090                    ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
091                    if (exceptionPolicy != null) {
092                        currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(currentRedeliveryPolicy);
093                        Processor processor = exceptionPolicy.getErrorHandler();
094                        if (processor != null) {
095                            failureProcessor = processor;
096                        }
097                    }
098                }
099            } while (currentRedeliveryPolicy.shouldRedeliver(redeliveryCounter));
100    
101            // now lets send to the dead letter queue
102            failureProcessor.process(exchange);
103        }
104    
105        // Properties
106        // -------------------------------------------------------------------------
107    
108        /**
109         * Returns the output processor
110         */
111        public Processor getOutput() {
112            return output;
113        }
114    
115        /**
116         * Returns the dead letter that message exchanges will be sent to if the
117         * redelivery attempts fail
118         */
119        public Processor getDeadLetter() {
120            return deadLetter;
121        }
122    
123        public RedeliveryPolicy getRedeliveryPolicy() {
124            return redeliveryPolicy;
125        }
126    
127        /**
128         * Sets the redelivery policy
129         */
130        public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
131            this.redeliveryPolicy = redeliveryPolicy;
132        }
133    
134        public Logger getLogger() {
135            return logger;
136        }
137    
138        /**
139         * Sets the logger strategy; which {@link Log} to use and which
140         * {@link LoggingLevel} to use
141         */
142        public void setLogger(Logger logger) {
143            this.logger = logger;
144        }
145    
146        // Implementation methods
147        // -------------------------------------------------------------------------
148    
149        /**
150         * Increments the redelivery counter and adds the redelivered flag if the
151         * message has been redelivered
152         */
153        protected int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
154            Message in = exchange.getIn();
155            Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class);
156            int next = 1;
157            if (counter != null) {
158                next = counter + 1;
159            }
160            in.setHeader(REDELIVERY_COUNTER, next);
161            in.setHeader(REDELIVERED, true);
162            exchange.setException(e);
163            return next;
164        }
165    
166        protected void sleep(long redeliveryDelay) {
167            if (redeliveryDelay > 0) {
168                if (LOG.isDebugEnabled()) {
169                    LOG.debug("Sleeping for: " + redeliveryDelay + " until attempting redelivery");
170                }
171                try {
172                    Thread.sleep(redeliveryDelay);
173                } catch (InterruptedException e) {
174                    if (LOG.isDebugEnabled()) {
175                        LOG.debug("Thread interupted: " + e, e);
176                    }
177                }
178            }
179        }
180    
181        protected void doStart() throws Exception {
182            ServiceHelper.startServices(output, deadLetter);
183        }
184    
185        protected void doStop() throws Exception {
186            ServiceHelper.stopServices(deadLetter, output);
187        }
188    }