001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import org.apache.camel.Exchange;
020    import org.apache.camel.Predicate;
021    import org.apache.camel.Processor;
022    import org.apache.camel.model.OnExceptionDefinition;
023    import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
024    import org.apache.camel.util.ExchangeHelper;
025    import org.apache.camel.util.MessageHelper;
026    import org.apache.camel.util.ServiceHelper;
027    
028    /**
029     * Default error handler
030     *
031     * @version $Revision: 770599 $
032     */
033    public class DefaultErrorHandler extends ErrorHandlerSupport implements Processor {
034        private Processor output;
035    
036        public DefaultErrorHandler(Processor output, ExceptionPolicyStrategy exceptionPolicyStrategy) {
037            this.output = output;
038            setExceptionPolicy(exceptionPolicyStrategy);
039        }
040    
041        @Override
042        public String toString() {
043            return "DefaultErrorHandler[" + output + "]";
044        }
045    
046        public boolean supportTransacted() {
047            return false;
048        }
049    
050        public void process(Exchange exchange) throws Exception {
051            try {
052                output.process(exchange);
053            } catch (Exception e) {
054                exchange.setException(e);
055            }
056    
057            // do not handle transacted exchanges as this error handler does not support it
058            boolean handle = true;
059            if (exchange.isTransacted() && !supportTransacted()) {
060                handle = false;
061                if (log.isDebugEnabled()) {
062                    log.debug("This error handler does not support transacted exchanges."
063                        + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId());
064                }
065            }
066    
067            if (handle && exchange.getException() != null && !ExchangeHelper.isFailureHandled(exchange)) {
068                handleException(exchange);
069            }
070        }
071    
072        private void handleException(Exchange exchange) throws Exception {
073            Exception e = exchange.getException();
074    
075            // store the original caused exception in a property, so we can restore it later
076            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
077    
078            // find the error handler to use (if any)
079            OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
080            if (exceptionPolicy != null) {
081                Predicate handledPredicate = exceptionPolicy.getHandledPolicy();
082    
083                Processor processor = exceptionPolicy.getErrorHandler();
084                prepareExchangeBeforeOnException(exchange);
085                if (processor != null) {
086                    deliverToFaultProcessor(exchange, processor);
087                }
088                prepareExchangeAfterOnException(exchange, handledPredicate);
089            }
090        }
091    
092        private void prepareExchangeBeforeOnException(Exchange exchange) {
093            // okay lower the exception as we are handling it by onException
094            if (exchange.getException() != null) {
095                exchange.setException(null);
096            }
097    
098            // clear rollback flags
099            exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
100    
101            // reset cached streams so they can be read again
102            MessageHelper.resetStreamCache(exchange.getIn());
103        }
104    
105        private void deliverToFaultProcessor(final Exchange exchange, final Processor failureProcessor) throws Exception {
106            failureProcessor.process(exchange);
107        }
108    
109        private void prepareExchangeAfterOnException(Exchange exchange, Predicate handledPredicate) {
110            if (handledPredicate == null || !handledPredicate.matches(exchange)) {
111                if (log.isDebugEnabled()) {
112                    log.debug("This exchange is not handled so its marked as failed: " + exchange);
113                }
114                // exception not handled, put exception back in the exchange
115                exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
116            } else {
117                if (log.isDebugEnabled()) {
118                    log.debug("This exchange is handled so its marked as not failed: " + exchange);
119                }
120                exchange.setProperty(Exchange.EXCEPTION_HANDLED, Boolean.TRUE);
121            }
122        }
123    
124        /**
125         * Returns the output processor
126         */
127        public Processor getOutput() {
128            return output;
129        }
130    
131        protected void doStart() throws Exception {
132            ServiceHelper.startServices(output);
133        }
134    
135        protected void doStop() throws Exception {
136            ServiceHelper.stopServices(output);
137        }
138    
139    }