001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import org.apache.camel.Exchange; 020 import org.apache.camel.Predicate; 021 import org.apache.camel.Processor; 022 import org.apache.camel.model.OnExceptionDefinition; 023 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; 024 import org.apache.camel.util.ExchangeHelper; 025 import org.apache.camel.util.MessageHelper; 026 import org.apache.camel.util.ServiceHelper; 027 028 /** 029 * Default error handler 030 * 031 * @version $Revision: 770599 $ 032 */ 033 public class DefaultErrorHandler extends ErrorHandlerSupport implements Processor { 034 private Processor output; 035 036 public DefaultErrorHandler(Processor output, ExceptionPolicyStrategy exceptionPolicyStrategy) { 037 this.output = output; 038 setExceptionPolicy(exceptionPolicyStrategy); 039 } 040 041 @Override 042 public String toString() { 043 return "DefaultErrorHandler[" + output + "]"; 044 } 045 046 public boolean supportTransacted() { 047 return false; 048 } 049 050 public void process(Exchange exchange) throws Exception { 051 try { 052 output.process(exchange); 053 } catch (Exception e) { 054 exchange.setException(e); 055 } 056 057 // do not handle transacted exchanges as this error handler does not support it 058 boolean handle = true; 059 if (exchange.isTransacted() && !supportTransacted()) { 060 handle = false; 061 if (log.isDebugEnabled()) { 062 log.debug("This error handler does not support transacted exchanges." 063 + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId()); 064 } 065 } 066 067 if (handle && exchange.getException() != null && !ExchangeHelper.isFailureHandled(exchange)) { 068 handleException(exchange); 069 } 070 } 071 072 private void handleException(Exchange exchange) throws Exception { 073 Exception e = exchange.getException(); 074 075 // store the original caused exception in a property, so we can restore it later 076 exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); 077 078 // find the error handler to use (if any) 079 OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e); 080 if (exceptionPolicy != null) { 081 Predicate handledPredicate = exceptionPolicy.getHandledPolicy(); 082 083 Processor processor = exceptionPolicy.getErrorHandler(); 084 prepareExchangeBeforeOnException(exchange); 085 if (processor != null) { 086 deliverToFaultProcessor(exchange, processor); 087 } 088 prepareExchangeAfterOnException(exchange, handledPredicate); 089 } 090 } 091 092 private void prepareExchangeBeforeOnException(Exchange exchange) { 093 // okay lower the exception as we are handling it by onException 094 if (exchange.getException() != null) { 095 exchange.setException(null); 096 } 097 098 // clear rollback flags 099 exchange.setProperty(Exchange.ROLLBACK_ONLY, null); 100 101 // reset cached streams so they can be read again 102 MessageHelper.resetStreamCache(exchange.getIn()); 103 } 104 105 private void deliverToFaultProcessor(final Exchange exchange, final Processor failureProcessor) throws Exception { 106 failureProcessor.process(exchange); 107 } 108 109 private void prepareExchangeAfterOnException(Exchange exchange, Predicate handledPredicate) { 110 if (handledPredicate == null || !handledPredicate.matches(exchange)) { 111 if (log.isDebugEnabled()) { 112 log.debug("This exchange is not handled so its marked as failed: " + exchange); 113 } 114 // exception not handled, put exception back in the exchange 115 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); 116 } else { 117 if (log.isDebugEnabled()) { 118 log.debug("This exchange is handled so its marked as not failed: " + exchange); 119 } 120 exchange.setProperty(Exchange.EXCEPTION_HANDLED, Boolean.TRUE); 121 } 122 } 123 124 /** 125 * Returns the output processor 126 */ 127 public Processor getOutput() { 128 return output; 129 } 130 131 protected void doStart() throws Exception { 132 ServiceHelper.startServices(output); 133 } 134 135 protected void doStop() throws Exception { 136 ServiceHelper.stopServices(output); 137 } 138 139 }