001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.loadbalancer; 018 019 import java.util.List; 020 021 import org.apache.camel.Exchange; 022 import org.apache.camel.Processor; 023 import org.apache.camel.util.ObjectHelper; 024 025 /** 026 * This FailOverLoadBalancer will failover to use next processor when an exception occured 027 */ 028 public class FailOverLoadBalancer extends LoadBalancerSupport { 029 030 private final List<Class> exceptions; 031 032 public FailOverLoadBalancer() { 033 this.exceptions = null; 034 } 035 036 public FailOverLoadBalancer(List<Class> exceptions) { 037 this.exceptions = exceptions; 038 for (Class type : exceptions) { 039 if (!ObjectHelper.isAssignableFrom(Throwable.class, type)) { 040 throw new IllegalArgumentException("Class is not an instance of Trowable: " + type); 041 } 042 } 043 } 044 045 /** 046 * Should the given failed Exchange failover? 047 * 048 * @param exchange the exchange that failed 049 * @return <tt>true</tt> to failover 050 */ 051 protected boolean shouldFailOver(Exchange exchange) { 052 if (exchange.getException() != null) { 053 054 if (exceptions == null || exceptions.isEmpty()) { 055 // always failover if no exceptions defined 056 return true; 057 } 058 059 for (Class exception : exceptions) { 060 // will look in exception hierarchy 061 if (exchange.getException(exception) != null) { 062 return true; 063 } 064 } 065 } 066 067 return false; 068 } 069 070 public void process(Exchange exchange) throws Exception { 071 List<Processor> list = getProcessors(); 072 if (list.isEmpty()) { 073 throw new IllegalStateException("No processors available to process " + exchange); 074 } 075 076 int index = 0; 077 Processor processor = list.get(index); 078 079 // process the first time 080 processExchange(processor, exchange); 081 082 // loop while we should fail over 083 while (shouldFailOver(exchange)) { 084 index++; 085 if (index < list.size()) { 086 // try again but reset exception first 087 exchange.setException(null); 088 processor = list.get(index); 089 processExchange(processor, exchange); 090 } else { 091 // no more processors to try 092 break; 093 } 094 } 095 } 096 097 private void processExchange(Processor processor, Exchange exchange) { 098 if (processor == null) { 099 throw new IllegalStateException("No processors could be chosen to process " + exchange); 100 } 101 try { 102 processor.process(exchange); 103 } catch (Exception e) { 104 exchange.setException(e); 105 } 106 } 107 108 }