001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.RejectedExecutionException; 020 021 import org.apache.camel.Exchange; 022 import org.apache.camel.LoggingLevel; 023 import org.apache.camel.Message; 024 import org.apache.camel.Predicate; 025 import org.apache.camel.Processor; 026 import org.apache.camel.model.OnExceptionDefinition; 027 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; 028 import org.apache.camel.util.ExchangeHelper; 029 import org.apache.camel.util.MessageHelper; 030 import org.apache.camel.util.ServiceHelper; 031 032 /** 033 * Implements a <a 034 * href="http://camel.apache.org/dead-letter-channel.html">Dead Letter 035 * Channel</a> after attempting to redeliver the message using the 036 * {@link RedeliveryPolicy} 037 * 038 * @version $Revision: 774230 $ 039 */ 040 public class DeadLetterChannel extends ErrorHandlerSupport implements Processor { 041 042 // TODO: Introduce option to allow async redelivery, eg to not block thread while delay 043 // (eg the Timer task code). However we should consider using Channels that has internal 044 // producer/consumer queues with "delayed" support so a redelivery is just to move an 045 // exchange to this channel with the computed delay time 046 // we need to provide option so end users can decide if they would like to spawn an async thread 047 // or not. Also consider MEP as InOut does not work with async then as the original caller thread 048 // is expecting a reply in the sync thread. 049 050 // we can use a single shared static timer for async redeliveries 051 private final Processor deadLetter; 052 private final String deadLetterUri; 053 private final Processor output; 054 private final Processor redeliveryProcessor; 055 private final RedeliveryPolicy redeliveryPolicy; 056 private final Predicate handledPolicy; 057 private final Logger logger; 058 private final boolean useOriginalBodyPolicy; 059 060 private class RedeliveryData { 061 int redeliveryCounter; 062 long redeliveryDelay; 063 Predicate retryUntilPredicate; 064 065 // default behavior which can be overloaded on a per exception basis 066 RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy; 067 Processor deadLetterQueue = deadLetter; 068 Processor onRedeliveryProcessor = redeliveryProcessor; 069 Predicate handledPredicate = handledPolicy; 070 boolean useOriginalInBody = useOriginalBodyPolicy; 071 } 072 073 /** 074 * Creates the dead letter channel. 075 * 076 * @param output outer processor that should use this dead letter channel 077 * @param deadLetter the failure processor to send failed exchanges to 078 * @param deadLetterUri an optional uri for logging purpose 079 * @param redeliveryProcessor an optional processor to run before redelivert attempt 080 * @param redeliveryPolicy policy for redelivery 081 * @param logger logger to use for logging failures and redelivery attempts 082 * @param exceptionPolicyStrategy strategy for onException handling 083 * @param handledPolicy policy for handling failed exception that are moved to the dead letter queue 084 * @param useOriginalBodyPolicy should the original IN body be moved to the dead letter queue or the current exchange IN body? 085 */ 086 public DeadLetterChannel(Processor output, Processor deadLetter, String deadLetterUri, Processor redeliveryProcessor, 087 RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy, 088 Predicate handledPolicy, boolean useOriginalBodyPolicy) { 089 this.output = output; 090 this.deadLetter = deadLetter; 091 this.deadLetterUri = deadLetterUri; 092 this.redeliveryProcessor = redeliveryProcessor; 093 this.redeliveryPolicy = redeliveryPolicy; 094 this.logger = logger; 095 this.handledPolicy = handledPolicy; 096 this.useOriginalBodyPolicy = useOriginalBodyPolicy; 097 setExceptionPolicy(exceptionPolicyStrategy); 098 } 099 100 @Override 101 public String toString() { 102 return "DeadLetterChannel[" + output + ", " + (deadLetterUri != null ? deadLetterUri : deadLetter) + "]"; 103 } 104 105 public boolean supportTransacted() { 106 return false; 107 } 108 109 public void process(Exchange exchange) throws Exception { 110 processErrorHandler(exchange, new RedeliveryData()); 111 } 112 113 /** 114 * Processes the exchange decorated with this dead letter channel. 115 */ 116 protected void processErrorHandler(final Exchange exchange, final RedeliveryData data) { 117 118 while (true) { 119 // we can't keep retrying if the route is being shutdown. 120 if (!isRunAllowed()) { 121 if (log.isDebugEnabled()) { 122 log.debug("Rejected execution as we are not started for exchange: " + exchange); 123 } 124 if (exchange.getException() == null) { 125 exchange.setException(new RejectedExecutionException()); 126 return; 127 } 128 } 129 130 // do not handle transacted exchanges that failed as this error handler does not support it 131 if (exchange.isTransacted() && !supportTransacted() && exchange.getException() != null) { 132 if (log.isDebugEnabled()) { 133 log.debug("This error handler does not support transacted exchanges." 134 + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId()); 135 } 136 return; 137 } 138 139 // did previous processing caused an exception? 140 if (exchange.getException() != null) { 141 handleException(exchange, data); 142 } 143 144 // compute if we should redeliver or not 145 boolean shouldRedeliver = shouldRedeliver(exchange, data); 146 if (!shouldRedeliver) { 147 // no then move it to the dead letter queue 148 deliverToDeadLetterQueue(exchange, data); 149 // and we are finished since the exchanged was moved to the dead letter queue 150 return; 151 } 152 153 // if we are redelivering then sleep before trying again 154 if (data.redeliveryCounter > 0) { 155 prepareExchangeForRedelivery(exchange); 156 157 // wait until we should redeliver 158 try { 159 data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay, data.redeliveryCounter); 160 } catch (InterruptedException e) { 161 log.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped())); 162 // continue from top 163 continue; 164 } 165 166 // letting onRedeliver be executed 167 deliverToRedeliveryProcessor(exchange, data); 168 } 169 170 // process the exchange 171 try { 172 output.process(exchange); 173 } catch (Exception e) { 174 exchange.setException(e); 175 } 176 177 // only process if the exchange hasn't failed 178 // and it has not been handled by the error processor 179 boolean done = exchange.getException() == null || ExchangeHelper.isFailureHandled(exchange); 180 if (done) { 181 return; 182 } 183 // error occurred so loop back around..... 184 } 185 186 } 187 188 // Properties 189 // ------------------------------------------------------------------------- 190 191 /** 192 * Returns the output processor 193 */ 194 public Processor getOutput() { 195 return output; 196 } 197 198 /** 199 * Returns the dead letter that message exchanges will be sent to if the 200 * redelivery attempts fail 201 */ 202 public Processor getDeadLetter() { 203 return deadLetter; 204 } 205 206 public RedeliveryPolicy getRedeliveryPolicy() { 207 return redeliveryPolicy; 208 } 209 210 public Logger getLogger() { 211 return logger; 212 } 213 214 // Implementation methods 215 // ------------------------------------------------------------------------- 216 217 private void prepareExchangeForRedelivery(Exchange exchange) { 218 // okay we will give it another go so clear the exception so we can try again 219 if (exchange.getException() != null) { 220 exchange.setException(null); 221 } 222 223 // clear rollback flags 224 exchange.setProperty(Exchange.ROLLBACK_ONLY, null); 225 226 // reset cached streams so they can be read again 227 MessageHelper.resetStreamCache(exchange.getIn()); 228 } 229 230 private void handleException(Exchange exchange, RedeliveryData data) { 231 Throwable e = exchange.getException(); 232 233 // store the original caused exception in a property, so we can restore it later 234 exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); 235 236 // find the error handler to use (if any) 237 OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e); 238 if (exceptionPolicy != null) { 239 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy); 240 data.handledPredicate = exceptionPolicy.getHandledPolicy(); 241 data.retryUntilPredicate = exceptionPolicy.getRetryUntilPolicy(); 242 data.useOriginalInBody = exceptionPolicy.getUseOriginalBodyPolicy(); 243 244 // route specific failure handler? 245 Processor processor = exceptionPolicy.getErrorHandler(); 246 if (processor != null) { 247 data.deadLetterQueue = processor; 248 } 249 // route specific on redelivey? 250 processor = exceptionPolicy.getOnRedelivery(); 251 if (processor != null) { 252 data.onRedeliveryProcessor = processor; 253 } 254 } 255 256 String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId() 257 + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e; 258 logFailedDelivery(true, exchange, msg, data, e); 259 260 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e); 261 } 262 263 /** 264 * Gives an optional configure redelivery processor a chance to process before the Exchange 265 * will be redelivered. This can be used to alter the Exchange. 266 */ 267 private void deliverToRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) { 268 if (data.onRedeliveryProcessor == null) { 269 return; 270 } 271 272 if (log.isTraceEnabled()) { 273 log.trace("RedeliveryProcessor " + data.onRedeliveryProcessor + " is processing Exchange: " + exchange + " before its redelivered"); 274 } 275 276 try { 277 data.onRedeliveryProcessor.process(exchange); 278 } catch (Exception e) { 279 exchange.setException(e); 280 } 281 log.trace("Redelivery processor done"); 282 } 283 284 /** 285 * All redelivery attempts failed so move the exchange to the dead letter queue 286 */ 287 private void deliverToDeadLetterQueue(final Exchange exchange, final RedeliveryData data) { 288 if (data.deadLetterQueue == null) { 289 return; 290 } 291 292 // we did not success with the redelivery so now we let the failure processor handle it 293 ExchangeHelper.setFailureHandled(exchange); 294 // must decrement the redelivery counter as we didn't process the redelivery but is 295 // handling by the failure handler. So we must -1 to not let the counter be out-of-sync 296 decrementRedeliveryCounter(exchange); 297 // reset cached streams so they can be read again 298 MessageHelper.resetStreamCache(exchange.getIn()); 299 300 // prepare original IN body if it should be moved instead of current body 301 if (data.useOriginalInBody) { 302 if (log.isTraceEnabled()) { 303 log.trace("Using the original IN body in the DedLetterQueue instead of the current IN body"); 304 } 305 306 Object original = exchange.getUnitOfWork().getOriginalInBody(); 307 exchange.getIn().setBody(original); 308 } 309 310 if (log.isTraceEnabled()) { 311 log.trace("DeadLetterQueue " + data.deadLetterQueue + " is processing Exchange: " + exchange); 312 } 313 try { 314 data.deadLetterQueue.process(exchange); 315 } catch (Exception e) { 316 exchange.setException(e); 317 } 318 log.trace("DedLetterQueue processor done"); 319 320 prepareExchangeAfterMovedToDeadLetterQueue(exchange, data.handledPredicate); 321 322 String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId() 323 + ". Moved to the dead letter queue: " + data.deadLetterQueue; 324 logFailedDelivery(false, exchange, msg, data, null); 325 } 326 327 private void prepareExchangeAfterMovedToDeadLetterQueue(Exchange exchange, Predicate handledPredicate) { 328 if (handledPredicate == null || !handledPredicate.matches(exchange)) { 329 if (log.isDebugEnabled()) { 330 log.debug("This exchange is not handled so its marked as failed: " + exchange); 331 } 332 // exception not handled, put exception back in the exchange 333 exchange.setProperty(Exchange.EXCEPTION_HANDLED, Boolean.FALSE); 334 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); 335 } else { 336 if (log.isDebugEnabled()) { 337 log.debug("This exchange is handled so its marked as not failed: " + exchange); 338 } 339 exchange.setProperty(Exchange.EXCEPTION_HANDLED, Boolean.TRUE); 340 } 341 } 342 343 private void logFailedDelivery(boolean shouldRedeliver, Exchange exchange, String message, RedeliveryData data, Throwable e) { 344 LoggingLevel newLogLevel; 345 if (shouldRedeliver) { 346 newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel(); 347 } else { 348 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); 349 } 350 if (exchange.isRollbackOnly()) { 351 String msg = "Rollback exchange"; 352 if (exchange.getException() != null) { 353 msg = msg + " due: " + exchange.getException().getMessage(); 354 } 355 if (newLogLevel == LoggingLevel.ERROR || newLogLevel == LoggingLevel.FATAL) { 356 // log intented rollback on maximum WARN level (no ERROR or FATAL) 357 logger.log(msg, LoggingLevel.WARN); 358 } else { 359 // otherwise use the desired logging level 360 logger.log(msg, newLogLevel); 361 } 362 } else if (data.currentRedeliveryPolicy.isLogStackTrace() && e != null) { 363 logger.log(message, e, newLogLevel); 364 } else { 365 logger.log(message, newLogLevel); 366 } 367 } 368 369 private boolean shouldRedeliver(Exchange exchange, RedeliveryData data) { 370 return data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryUntilPredicate); 371 } 372 373 /** 374 * Increments the redelivery counter and adds the redelivered flag if the 375 * message has been redelivered 376 */ 377 private int incrementRedeliveryCounter(Exchange exchange, Throwable e) { 378 Message in = exchange.getIn(); 379 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 380 int next = 1; 381 if (counter != null) { 382 next = counter + 1; 383 } 384 in.setHeader(Exchange.REDELIVERY_COUNTER, next); 385 in.setHeader(Exchange.REDELIVERED, Boolean.TRUE); 386 return next; 387 } 388 389 /** 390 * Prepares the redelivery counter and boolean flag for the failure handle processor 391 */ 392 private void decrementRedeliveryCounter(Exchange exchange) { 393 Message in = exchange.getIn(); 394 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 395 if (counter != null) { 396 int prev = counter - 1; 397 in.setHeader(Exchange.REDELIVERY_COUNTER, prev); 398 // set boolean flag according to counter 399 in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE); 400 } else { 401 // not redelivered 402 in.setHeader(Exchange.REDELIVERY_COUNTER, 0); 403 in.setHeader(Exchange.REDELIVERED, Boolean.FALSE); 404 } 405 } 406 407 @Override 408 protected void doStart() throws Exception { 409 ServiceHelper.startServices(output, deadLetter); 410 } 411 412 @Override 413 protected void doStop() throws Exception { 414 ServiceHelper.stopServices(deadLetter, output); 415 } 416 417 }