001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.loadbalancer;
018    
019    import java.util.List;
020    
021    import org.apache.camel.AsyncCallback;
022    import org.apache.camel.AsyncProcessor;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Processor;
025    import org.apache.camel.util.ObjectHelper;
026    
027    /**
028     * This FailOverLoadBalancer will failover to use next processor when an exception occured
029     */
030    public class FailOverLoadBalancer extends LoadBalancerSupport {
031    
032        private final Class failException;
033    
034        public FailOverLoadBalancer(Class throwable) {
035            if (ObjectHelper.isAssignableFrom(Throwable.class, throwable)) {
036                failException = throwable;
037            } else {
038                throw new IllegalArgumentException("Class is not an instance of Trowable: " + throwable);
039            }
040        }
041    
042        public FailOverLoadBalancer() {
043            this(Throwable.class);
044        }
045    
046        protected boolean isCheckedException(Exchange exchange) {
047            if (exchange.getException() != null) {
048                if (failException.isAssignableFrom(exchange.getException().getClass())) {
049                    return true;
050                }
051            }
052            return false;
053        }
054    
055        private void processExchange(Processor processor, Exchange exchange) {
056            if (processor == null) {
057                throw new IllegalStateException("No processors could be chosen to process " + exchange);
058            }
059            try {
060                processor.process(exchange);
061            } catch (Exception e) {
062                exchange.setException(e);
063            }
064        }
065    
066        public void process(Exchange exchange) throws Exception {
067            List<Processor> list = getProcessors();
068            if (list.isEmpty()) {
069                throw new IllegalStateException("No processors available to process " + exchange);
070            }
071            int index = 0;
072            Processor processor = list.get(index);
073            processExchange(processor, exchange);
074            while (isCheckedException(exchange)) {
075                exchange.setException(null);
076                index++;
077                if (index < list.size()) {
078                    processor = list.get(index);
079                    processExchange(processor, exchange);
080                } else {
081                    break;
082                }
083            }
084        }
085    
086        public boolean process(Exchange exchange, final AsyncCallback callback) {
087            return processExchange(0, exchange, callback);
088        }
089    
090        public boolean processExchange(final int index, final Exchange exchange, final AsyncCallback callback) {
091            boolean sync;
092    
093            List<Processor> list = getProcessors();
094            if (list.isEmpty()) {
095                throw new IllegalStateException("No processors available to process " + exchange);
096            }
097    
098            Processor processor = list.get(index);
099            if (processor == null) {
100                throw new IllegalStateException("No processors could be chosen to process " + exchange);
101            }
102            if (processor instanceof AsyncProcessor) {
103                AsyncProcessor asyncProcessor = (AsyncProcessor) processor;
104                sync = asyncProcessor.process(exchange, new AsyncCallback() {
105                    public void done(boolean doSync) {
106                        // check the exchange and call the FailOverProcessor
107                        if (isCheckedException(exchange) && index < getProcessors().size() - 1) {
108                            exchange.setException(null);
109                            processExchange(index + 1, exchange, callback);
110                        } else {
111                            callback.done(doSync);
112                        }
113                    }
114                });
115            } else {
116                try {
117                    processor.process(exchange);
118                } catch (Exception ex) {
119                    exchange.setException(ex);
120                }
121                if (isCheckedException(exchange) && index < getProcessors().size() - 1) {
122                    exchange.setException(null);
123                    processExchange(index + 1, exchange, callback);
124                }
125                sync = true;
126                callback.done(true);
127            }
128            return sync;
129        }
130    
131    }