001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.RejectedExecutionException; 020 021 import org.apache.camel.AsyncCallback; 022 import org.apache.camel.AsyncProcessor; 023 import org.apache.camel.Exchange; 024 import org.apache.camel.ExchangeProperty; 025 import org.apache.camel.Message; 026 import org.apache.camel.Processor; 027 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; 028 import org.apache.camel.model.ExceptionType; 029 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; 030 import org.apache.camel.util.AsyncProcessorHelper; 031 import org.apache.camel.util.ServiceHelper; 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 035 /** 036 * Implements a <a 037 * href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter 038 * Channel</a> after attempting to redeliver the message using the 039 * {@link RedeliveryPolicy} 040 * 041 * @version $Revision: 674036 $ 042 */ 043 public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor { 044 public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter"; 045 public static final String REDELIVERED = "org.apache.camel.Redelivered"; 046 public static final String EXCEPTION_CAUSE_PROPERTY = "CamelCauseException"; 047 048 private class RedeliveryData { 049 int redeliveryCounter; 050 long redeliveryDelay; 051 boolean sync = true; 052 053 // default behaviour which can be overloaded on a per exception basis 054 RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy; 055 Processor failureProcessor = deadLetter; 056 } 057 058 private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class); 059 private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED"; 060 private Processor output; 061 private Processor deadLetter; 062 private AsyncProcessor outputAsync; 063 private RedeliveryPolicy redeliveryPolicy; 064 private Logger logger; 065 066 public DeadLetterChannel(Processor output, Processor deadLetter) { 067 this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger(), 068 ErrorHandlerSupport.createDefaultExceptionPolicyStrategy()); 069 } 070 071 public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy) { 072 this.deadLetter = deadLetter; 073 this.output = output; 074 this.outputAsync = AsyncProcessorTypeConverter.convert(output); 075 076 this.redeliveryPolicy = redeliveryPolicy; 077 this.logger = logger; 078 setExceptionPolicy(exceptionPolicyStrategy); 079 } 080 081 public static <E extends Exchange> Logger createDefaultLogger() { 082 return new Logger(LOG, LoggingLevel.ERROR); 083 } 084 085 @Override 086 public String toString() { 087 return "DeadLetterChannel[" + output + ", " + deadLetter + ", " + redeliveryPolicy + "]"; 088 } 089 090 public boolean process(Exchange exchange, final AsyncCallback callback) { 091 return process(exchange, callback, new RedeliveryData()); 092 } 093 094 public boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) { 095 096 while (true) { 097 // We can't keep retrying if the route is being shutdown. 098 if (!isRunAllowed()) { 099 if (exchange.getException() == null) { 100 exchange.setException(new RejectedExecutionException()); 101 } 102 callback.done(data.sync); 103 return data.sync; 104 } 105 106 // if the exchange is transacted then let the underlysing system handle the redelivery etc. 107 // this DeadLetterChannel is only for non transacted exchanges 108 if (exchange.isTransacted() && exchange.getException() != null) { 109 if (LOG.isDebugEnabled()) { 110 LOG.debug("Transacted Exchange, this DeadLetterChannel is bypassed: " + exchange); 111 } 112 return data.sync; 113 } 114 115 if (exchange.getException() != null) { 116 Throwable e = exchange.getException(); 117 exchange.setException(null); // Reset it since we are handling it. 118 119 logger.log("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e, e); 120 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e); 121 122 ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e); 123 if (exceptionPolicy != null) { 124 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(data.currentRedeliveryPolicy); 125 Processor processor = exceptionPolicy.getErrorHandler(); 126 if (processor != null) { 127 data.failureProcessor = processor; 128 } 129 } 130 } 131 132 if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) { 133 setFailureHandled(exchange, true); 134 AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor); 135 boolean sync = afp.process(exchange, new AsyncCallback() { 136 public void done(boolean sync) { 137 restoreExceptionOnExchange(exchange); 138 callback.done(data.sync); 139 } 140 }); 141 142 restoreExceptionOnExchange(exchange); 143 logger.log("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". Handled by the failure processor: " + data.failureProcessor); 144 return sync; 145 } 146 147 if (data.redeliveryCounter > 0) { 148 // Figure out how long we should wait to resend this message. 149 data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay); 150 } 151 152 exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, exchange.getException()); 153 exchange.setException(null); 154 155 boolean sync = outputAsync.process(exchange, new AsyncCallback() { 156 public void done(boolean sync) { 157 // Only handle the async case... 158 if (sync) { 159 return; 160 } 161 data.sync = false; 162 if (exchange.getException() != null) { 163 process(exchange, callback, data); 164 } else { 165 callback.done(sync); 166 } 167 } 168 }); 169 if (!sync) { 170 // It is going to be processed async.. 171 return false; 172 } 173 if (exchange.getException() == null || isFailureHandled(exchange)) { 174 // If everything went well.. then we exit here.. 175 callback.done(true); 176 return true; 177 } 178 // error occurred so loop back around..... 179 } 180 181 } 182 183 public static boolean isFailureHandled(Exchange exchange) { 184 return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null; 185 } 186 187 public static void setFailureHandled(Exchange exchange, boolean isHandled) { 188 if (isHandled) { 189 exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException()); 190 exchange.setException(null); 191 } else { 192 exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class)); 193 exchange.removeProperty(FAILURE_HANDLED_PROPERTY); 194 } 195 196 } 197 198 public static void restoreExceptionOnExchange(Exchange exchange) { 199 exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class)); 200 } 201 202 public void process(Exchange exchange) throws Exception { 203 AsyncProcessorHelper.process(this, exchange); 204 } 205 206 // Properties 207 // ------------------------------------------------------------------------- 208 209 /** 210 * Returns the output processor 211 */ 212 public Processor getOutput() { 213 return output; 214 } 215 216 /** 217 * Returns the dead letter that message exchanges will be sent to if the 218 * redelivery attempts fail 219 */ 220 public Processor getDeadLetter() { 221 return deadLetter; 222 } 223 224 public RedeliveryPolicy getRedeliveryPolicy() { 225 return redeliveryPolicy; 226 } 227 228 /** 229 * Sets the redelivery policy 230 */ 231 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 232 this.redeliveryPolicy = redeliveryPolicy; 233 } 234 235 public Logger getLogger() { 236 return logger; 237 } 238 239 /** 240 * Sets the logger strategy; which {@link Log} to use and which 241 * {@link LoggingLevel} to use 242 */ 243 public void setLogger(Logger logger) { 244 this.logger = logger; 245 } 246 247 // Implementation methods 248 // ------------------------------------------------------------------------- 249 250 /** 251 * Increments the redelivery counter and adds the redelivered flag if the 252 * message has been redelivered 253 */ 254 protected int incrementRedeliveryCounter(Exchange exchange, Throwable e) { 255 Message in = exchange.getIn(); 256 Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class); 257 int next = 1; 258 if (counter != null) { 259 next = counter + 1; 260 } 261 in.setHeader(REDELIVERY_COUNTER, next); 262 in.setHeader(REDELIVERED, Boolean.TRUE); 263 exchange.setException(e); 264 return next; 265 } 266 267 @Override 268 protected void doStart() throws Exception { 269 ServiceHelper.startServices(output, deadLetter); 270 } 271 272 @Override 273 protected void doStop() throws Exception { 274 ServiceHelper.stopServices(deadLetter, output); 275 } 276 277 }