001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.RejectedExecutionException;
020    
021    import org.apache.camel.Exchange;
022    import org.apache.camel.LoggingLevel;
023    import org.apache.camel.Message;
024    import org.apache.camel.Predicate;
025    import org.apache.camel.Processor;
026    import org.apache.camel.model.OnExceptionDefinition;
027    import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
028    import org.apache.camel.util.ExchangeHelper;
029    import org.apache.camel.util.MessageHelper;
030    import org.apache.camel.util.ServiceHelper;
031    
032    /**
033     * Implements a <a
034     * href="http://camel.apache.org/dead-letter-channel.html">Dead Letter
035     * Channel</a> after attempting to redeliver the message using the
036     * {@link RedeliveryPolicy}
037     *
038     * @version $Revision: 774230 $
039     */
040    public class DeadLetterChannel extends ErrorHandlerSupport implements Processor {
041    
042        // TODO: Introduce option to allow async redelivery, eg to not block thread while delay
043        // (eg the Timer task code). However we should consider using Channels that has internal
044        // producer/consumer queues with "delayed" support so a redelivery is just to move an
045        // exchange to this channel with the computed delay time
046        // we need to provide option so end users can decide if they would like to spawn an async thread
047        // or not. Also consider MEP as InOut does not work with async then as the original caller thread
048        // is expecting a reply in the sync thread.
049    
050        // we can use a single shared static timer for async redeliveries
051        private final Processor deadLetter;
052        private final String deadLetterUri;
053        private final Processor output;
054        private final Processor redeliveryProcessor;
055        private final RedeliveryPolicy redeliveryPolicy;
056        private final Predicate handledPolicy;
057        private final Logger logger;
058        private final boolean useOriginalBodyPolicy;
059    
060        private class RedeliveryData {
061            int redeliveryCounter;
062            long redeliveryDelay;
063            Predicate retryUntilPredicate;
064    
065            // default behavior which can be overloaded on a per exception basis
066            RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
067            Processor deadLetterQueue = deadLetter;
068            Processor onRedeliveryProcessor = redeliveryProcessor;
069            Predicate handledPredicate = handledPolicy;
070            boolean useOriginalInBody = useOriginalBodyPolicy;
071        }
072        
073        /**
074         * Creates the dead letter channel.
075         *
076         * @param output                    outer processor that should use this dead letter channel
077         * @param deadLetter                the failure processor to send failed exchanges to
078         * @param deadLetterUri             an optional uri for logging purpose
079         * @param redeliveryProcessor       an optional processor to run before redelivert attempt
080         * @param redeliveryPolicy          policy for redelivery
081         * @param logger                    logger to use for logging failures and redelivery attempts
082         * @param exceptionPolicyStrategy   strategy for onException handling
083         * @param handledPolicy             policy for handling failed exception that are moved to the dead letter queue
084         * @param useOriginalBodyPolicy   should the original IN body be moved to the dead letter queue or the current exchange IN body?
085         */
086        public DeadLetterChannel(Processor output, Processor deadLetter, String deadLetterUri, Processor redeliveryProcessor,
087                                 RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy,
088                                 Predicate handledPolicy, boolean useOriginalBodyPolicy) {
089            this.output = output;
090            this.deadLetter = deadLetter;
091            this.deadLetterUri = deadLetterUri;
092            this.redeliveryProcessor = redeliveryProcessor;
093            this.redeliveryPolicy = redeliveryPolicy;
094            this.logger = logger;
095            this.handledPolicy = handledPolicy;
096            this.useOriginalBodyPolicy = useOriginalBodyPolicy;
097            setExceptionPolicy(exceptionPolicyStrategy);
098        }
099    
100        @Override
101        public String toString() {
102            return "DeadLetterChannel[" + output + ", " + (deadLetterUri != null ? deadLetterUri : deadLetter) + "]";
103        }
104    
105        public boolean supportTransacted() {
106            return false;
107        }
108    
109        public void process(Exchange exchange) throws Exception {
110            processErrorHandler(exchange, new RedeliveryData());
111        }
112    
113        /**
114         * Processes the exchange decorated with this dead letter channel.
115         */
116        protected void processErrorHandler(final Exchange exchange, final RedeliveryData data) {
117    
118            while (true) {
119                // we can't keep retrying if the route is being shutdown.
120                if (!isRunAllowed()) {
121                    if (log.isDebugEnabled()) {
122                        log.debug("Rejected execution as we are not started for exchange: " + exchange);
123                    }
124                    if (exchange.getException() == null) {
125                        exchange.setException(new RejectedExecutionException());
126                        return;
127                    }
128                }
129    
130                // do not handle transacted exchanges that failed as this error handler does not support it
131                if (exchange.isTransacted() && !supportTransacted() && exchange.getException() != null) {
132                    if (log.isDebugEnabled()) {
133                        log.debug("This error handler does not support transacted exchanges."
134                            + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId());
135                    }
136                    return;
137                }
138    
139                // did previous processing caused an exception?
140                if (exchange.getException() != null) {
141                    handleException(exchange, data);
142                }
143    
144                // compute if we should redeliver or not
145                boolean shouldRedeliver = shouldRedeliver(exchange, data);
146                if (!shouldRedeliver) {
147                    // no then move it to the dead letter queue
148                    deliverToDeadLetterQueue(exchange, data);
149                    // and we are finished since the exchanged was moved to the dead letter queue
150                    return;
151                }
152    
153                // if we are redelivering then sleep before trying again
154                if (data.redeliveryCounter > 0) {
155                    prepareExchangeForRedelivery(exchange);
156    
157                    // wait until we should redeliver
158                    try {
159                        data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay, data.redeliveryCounter);
160                    } catch (InterruptedException e) {
161                        log.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped()));
162                        // continue from top
163                        continue;
164                    }
165    
166                    // letting onRedeliver be executed
167                    deliverToRedeliveryProcessor(exchange, data);
168                }
169    
170                // process the exchange
171                try {
172                    output.process(exchange);
173                } catch (Exception e) {
174                    exchange.setException(e);
175                }
176    
177                // only process if the exchange hasn't failed
178                // and it has not been handled by the error processor
179                boolean done = exchange.getException() == null || ExchangeHelper.isFailureHandled(exchange);
180                if (done) {
181                    return;
182                }
183                // error occurred so loop back around.....
184            }
185    
186        }
187    
188        // Properties
189        // -------------------------------------------------------------------------
190    
191        /**
192         * Returns the output processor
193         */
194        public Processor getOutput() {
195            return output;
196        }
197    
198        /**
199         * Returns the dead letter that message exchanges will be sent to if the
200         * redelivery attempts fail
201         */
202        public Processor getDeadLetter() {
203            return deadLetter;
204        }
205    
206        public RedeliveryPolicy getRedeliveryPolicy() {
207            return redeliveryPolicy;
208        }
209    
210        public Logger getLogger() {
211            return logger;
212        }
213    
214        // Implementation methods
215        // -------------------------------------------------------------------------
216    
217        private void prepareExchangeForRedelivery(Exchange exchange) {
218            // okay we will give it another go so clear the exception so we can try again
219            if (exchange.getException() != null) {
220                exchange.setException(null);
221            }
222    
223            // clear rollback flags
224            exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
225    
226            // reset cached streams so they can be read again
227            MessageHelper.resetStreamCache(exchange.getIn());
228        }
229    
230        private void handleException(Exchange exchange, RedeliveryData data) {
231            Throwable e = exchange.getException();
232    
233            // store the original caused exception in a property, so we can restore it later
234            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
235    
236            // find the error handler to use (if any)
237            OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
238            if (exceptionPolicy != null) {
239                data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
240                data.handledPredicate = exceptionPolicy.getHandledPolicy();
241                data.retryUntilPredicate = exceptionPolicy.getRetryUntilPolicy();
242                data.useOriginalInBody = exceptionPolicy.getUseOriginalBodyPolicy();
243    
244                // route specific failure handler?
245                Processor processor = exceptionPolicy.getErrorHandler();
246                if (processor != null) {
247                    data.deadLetterQueue = processor;
248                }
249                // route specific on redelivey?
250                processor = exceptionPolicy.getOnRedelivery();
251                if (processor != null) {
252                    data.onRedeliveryProcessor = processor;
253                }
254            }
255    
256            String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
257                    + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
258            logFailedDelivery(true, exchange, msg, data, e);
259    
260            data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
261        }
262    
263        /**
264         * Gives an optional configure redelivery processor a chance to process before the Exchange
265         * will be redelivered. This can be used to alter the Exchange.
266         */
267        private void deliverToRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
268            if (data.onRedeliveryProcessor == null) {
269                return;
270            }
271    
272            if (log.isTraceEnabled()) {
273                log.trace("RedeliveryProcessor " + data.onRedeliveryProcessor + " is processing Exchange: " + exchange + " before its redelivered");
274            }
275    
276            try {
277                data.onRedeliveryProcessor.process(exchange);
278            } catch (Exception e) {
279                exchange.setException(e);
280            }
281            log.trace("Redelivery processor done");
282        }
283    
284        /**
285         * All redelivery attempts failed so move the exchange to the dead letter queue
286         */
287        private void deliverToDeadLetterQueue(final Exchange exchange, final RedeliveryData data) {
288            if (data.deadLetterQueue == null) {
289                return;
290            }
291    
292            // we did not success with the redelivery so now we let the failure processor handle it
293            ExchangeHelper.setFailureHandled(exchange);
294            // must decrement the redelivery counter as we didn't process the redelivery but is
295            // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
296            decrementRedeliveryCounter(exchange);
297            // reset cached streams so they can be read again
298            MessageHelper.resetStreamCache(exchange.getIn());
299    
300            // prepare original IN body if it should be moved instead of current body
301            if (data.useOriginalInBody) {
302                if (log.isTraceEnabled()) {
303                    log.trace("Using the original IN body in the DedLetterQueue instead of the current IN body");
304                }
305    
306                Object original = exchange.getUnitOfWork().getOriginalInBody();
307                exchange.getIn().setBody(original);
308            }
309    
310            if (log.isTraceEnabled()) {
311                log.trace("DeadLetterQueue " + data.deadLetterQueue + " is processing Exchange: " + exchange);
312            }
313            try {
314                data.deadLetterQueue.process(exchange);
315            } catch (Exception e) {
316                exchange.setException(e);
317            }
318            log.trace("DedLetterQueue processor done");
319    
320            prepareExchangeAfterMovedToDeadLetterQueue(exchange, data.handledPredicate);
321    
322            String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
323                    + ". Moved to the dead letter queue: " + data.deadLetterQueue;
324            logFailedDelivery(false, exchange, msg, data, null);
325        }
326    
327        private void prepareExchangeAfterMovedToDeadLetterQueue(Exchange exchange, Predicate handledPredicate) {
328            if (handledPredicate == null || !handledPredicate.matches(exchange)) {
329                if (log.isDebugEnabled()) {
330                    log.debug("This exchange is not handled so its marked as failed: " + exchange);
331                }
332                // exception not handled, put exception back in the exchange
333                exchange.setProperty(Exchange.EXCEPTION_HANDLED, Boolean.FALSE);
334                exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
335            } else {
336                if (log.isDebugEnabled()) {
337                    log.debug("This exchange is handled so its marked as not failed: " + exchange);
338                }
339                exchange.setProperty(Exchange.EXCEPTION_HANDLED, Boolean.TRUE);
340            }
341        }
342    
343        private void logFailedDelivery(boolean shouldRedeliver, Exchange exchange, String message, RedeliveryData data, Throwable e) {
344            LoggingLevel newLogLevel;
345            if (shouldRedeliver) {
346                newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
347            } else {
348                newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
349            }
350            if (exchange.isRollbackOnly()) {
351                String msg = "Rollback exchange";
352                if (exchange.getException() != null) {
353                    msg = msg + " due: " + exchange.getException().getMessage();
354                }
355                if (newLogLevel == LoggingLevel.ERROR || newLogLevel == LoggingLevel.FATAL) {
356                    // log intented rollback on maximum WARN level (no ERROR or FATAL)
357                    logger.log(msg, LoggingLevel.WARN);
358                } else {
359                    // otherwise use the desired logging level
360                    logger.log(msg, newLogLevel);
361                }
362            } else if (data.currentRedeliveryPolicy.isLogStackTrace() && e != null) {
363                logger.log(message, e, newLogLevel);
364            } else {
365                logger.log(message, newLogLevel);
366            }
367        }
368    
369        private boolean shouldRedeliver(Exchange exchange, RedeliveryData data) {
370            return data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryUntilPredicate);
371        }
372    
373        /**
374         * Increments the redelivery counter and adds the redelivered flag if the
375         * message has been redelivered
376         */
377        private int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
378            Message in = exchange.getIn();
379            Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
380            int next = 1;
381            if (counter != null) {
382                next = counter + 1;
383            }
384            in.setHeader(Exchange.REDELIVERY_COUNTER, next);
385            in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
386            return next;
387        }
388    
389        /**
390         * Prepares the redelivery counter and boolean flag for the failure handle processor
391         */
392        private void decrementRedeliveryCounter(Exchange exchange) {
393            Message in = exchange.getIn();
394            Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
395            if (counter != null) {
396                int prev = counter - 1;
397                in.setHeader(Exchange.REDELIVERY_COUNTER, prev);
398                // set boolean flag according to counter
399                in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE);
400            } else {
401                // not redelivered
402                in.setHeader(Exchange.REDELIVERY_COUNTER, 0);
403                in.setHeader(Exchange.REDELIVERED, Boolean.FALSE);
404            }
405        }
406    
407        @Override
408        protected void doStart() throws Exception {
409            ServiceHelper.startServices(output, deadLetter);
410        }
411    
412        @Override
413        protected void doStop() throws Exception {
414            ServiceHelper.stopServices(deadLetter, output);
415        }    
416        
417    }