Coverage Report - org.apache.camel.processor.DeadLetterChannel
 
Classes in this File Line Coverage Branch Coverage Complexity
DeadLetterChannel
79% 
88% 
0
 
 1  
 /**
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  *
 9  
  *      http://www.apache.org/licenses/LICENSE-2.0
 10  
  *
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 package org.apache.camel.processor;
 18  
 
 19  
 import org.apache.camel.Exchange;
 20  
 import org.apache.camel.Message;
 21  
 import org.apache.camel.Processor;
 22  
 import org.apache.camel.model.ExceptionType;
 23  
 import org.apache.camel.impl.ServiceSupport;
 24  
 import org.apache.camel.util.ServiceHelper;
 25  
 import org.apache.commons.logging.Log;
 26  
 import org.apache.commons.logging.LogFactory;
 27  
 
 28  
 /**
 29  
  * Implements a <a
 30  
  * href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter
 31  
  * Channel</a> after attempting to redeliver the message using the
 32  
  * {@link RedeliveryPolicy}
 33  
  * 
 34  
  * @version $Revision: 564644 $
 35  
  */
 36  
 public class DeadLetterChannel extends ErrorHandlerSupport {
 37  
     public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter";
 38  
     public static final String REDELIVERED = "org.apache.camel.Redelivered";
 39  
 
 40  3
     private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
 41  
     private Processor output;
 42  
     private Processor deadLetter;
 43  
     private RedeliveryPolicy redeliveryPolicy;
 44  
     private Logger logger;
 45  
 
 46  
     public DeadLetterChannel(Processor output, Processor deadLetter) {
 47  0
         this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger());
 48  0
     }
 49  
 
 50  
     public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy,
 51  291
                              Logger logger) {
 52  291
         this.deadLetter = deadLetter;
 53  291
         this.output = output;
 54  291
         this.redeliveryPolicy = redeliveryPolicy;
 55  291
         this.logger = logger;
 56  291
     }
 57  
     
 58  
     public static <E extends Exchange> Logger createDefaultLogger() {
 59  327
         return new Logger(LOG, LoggingLevel.ERROR);
 60  
     }
 61  
 
 62  
     @Override
 63  
     public String toString() {
 64  297
         return "DeadLetterChannel[" + output + ", " + deadLetter + ", " + redeliveryPolicy + "]";
 65  
     }
 66  
 
 67  
     public void process(Exchange exchange) throws Exception {
 68  372
         int redeliveryCounter = 0;
 69  372
         long redeliveryDelay = 0;
 70  
 
 71  
         // default behaviour which can be overloaded on a per exception basis
 72  372
         RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
 73  372
         Processor failureProcessor = deadLetter;
 74  
 
 75  
         do {
 76  381
             if (redeliveryCounter > 0) {
 77  
                 // Figure out how long we should wait to resend this message.
 78  9
                 redeliveryDelay = currentRedeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
 79  9
                 sleep(redeliveryDelay);
 80  
             }
 81  
 
 82  
             try {
 83  381
                 output.process(exchange);
 84  360
                 return;
 85  21
             } catch (Throwable e) {
 86  21
                 logger.log("On delivery attempt: " + redeliveryCounter + " caught: " + e, e);
 87  21
                 redeliveryCounter = incrementRedeliveryCounter(exchange, e);
 88  
 
 89  
 
 90  21
                 ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
 91  21
                 if (exceptionPolicy != null) {
 92  12
                     currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(currentRedeliveryPolicy);
 93  12
                     Processor processor = exceptionPolicy.getErrorHandler();
 94  12
                     if (processor != null) {
 95  12
                         failureProcessor = processor;
 96  
                     }
 97  
                 }
 98  
             }
 99  21
         } while (currentRedeliveryPolicy.shouldRedeliver(redeliveryCounter));
 100  
 
 101  
         // now lets send to the dead letter queue
 102  12
         failureProcessor.process(exchange);
 103  12
     }
 104  
 
 105  
     // Properties
 106  
     // -------------------------------------------------------------------------
 107  
 
 108  
     /**
 109  
      * Returns the output processor
 110  
      */
 111  
     public Processor getOutput() {
 112  39
         return output;
 113  
     }
 114  
 
 115  
     /**
 116  
      * Returns the dead letter that message exchanges will be sent to if the
 117  
      * redelivery attempts fail
 118  
      */
 119  
     public Processor getDeadLetter() {
 120  0
         return deadLetter;
 121  
     }
 122  
 
 123  
     public RedeliveryPolicy getRedeliveryPolicy() {
 124  0
         return redeliveryPolicy;
 125  
     }
 126  
 
 127  
     /**
 128  
      * Sets the redelivery policy
 129  
      */
 130  
     public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
 131  0
         this.redeliveryPolicy = redeliveryPolicy;
 132  0
     }
 133  
 
 134  
     public Logger getLogger() {
 135  0
         return logger;
 136  
     }
 137  
 
 138  
     /**
 139  
      * Sets the logger strategy; which {@link Log} to use and which
 140  
      * {@link LoggingLevel} to use
 141  
      */
 142  
     public void setLogger(Logger logger) {
 143  0
         this.logger = logger;
 144  0
     }
 145  
 
 146  
     // Implementation methods
 147  
     // -------------------------------------------------------------------------
 148  
 
 149  
     /**
 150  
      * Increments the redelivery counter and adds the redelivered flag if the
 151  
      * message has been redelivered
 152  
      */
 153  
     protected int incrementRedeliveryCounter(Exchange exchange, Throwable e) {
 154  21
         Message in = exchange.getIn();
 155  21
         Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class);
 156  21
         int next = 1;
 157  21
         if (counter != null) {
 158  6
             next = counter + 1;
 159  
         }
 160  21
         in.setHeader(REDELIVERY_COUNTER, next);
 161  21
         in.setHeader(REDELIVERED, true);
 162  21
         exchange.setException(e);
 163  21
         return next;
 164  
     }
 165  
 
 166  
     protected void sleep(long redeliveryDelay) {
 167  9
         if (redeliveryDelay > 0) {
 168  9
             if (LOG.isDebugEnabled()) {
 169  0
                 LOG.debug("Sleeping for: " + redeliveryDelay + " until attempting redelivery");
 170  
             }
 171  
             try {
 172  9
                 Thread.sleep(redeliveryDelay);
 173  0
             } catch (InterruptedException e) {
 174  0
                 if (LOG.isDebugEnabled()) {
 175  0
                     LOG.debug("Thread interupted: " + e, e);
 176  
                 }
 177  9
             }
 178  
         }
 179  9
     }
 180  
 
 181  
     protected void doStart() throws Exception {
 182  252
         ServiceHelper.startServices(output, deadLetter);
 183  252
     }
 184  
 
 185  
     protected void doStop() throws Exception {
 186  249
         ServiceHelper.stopServices(deadLetter, output);
 187  249
     }
 188  
 }