001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.loadbalancer; 018 019 import java.util.List; 020 021 import org.apache.camel.AsyncCallback; 022 import org.apache.camel.AsyncProcessor; 023 import org.apache.camel.Exchange; 024 import org.apache.camel.Processor; 025 import org.apache.camel.util.ObjectHelper; 026 027 /** 028 * This FailOverLoadBalancer will failover to use next processor when an exception occured 029 */ 030 public class FailOverLoadBalancer extends LoadBalancerSupport { 031 032 private final Class failException; 033 034 public FailOverLoadBalancer(Class throwable) { 035 if (ObjectHelper.isAssignableFrom(Throwable.class, throwable)) { 036 failException = throwable; 037 } else { 038 throw new IllegalArgumentException("Class is not an instance of Trowable: " + throwable); 039 } 040 } 041 042 public FailOverLoadBalancer() { 043 this(Throwable.class); 044 } 045 046 protected boolean isCheckedException(Exchange exchange) { 047 if (exchange.getException() != null) { 048 if (failException.isAssignableFrom(exchange.getException().getClass())) { 049 return true; 050 } 051 } 052 return false; 053 } 054 055 private void processExchange(Processor processor, Exchange exchange) { 056 if (processor == null) { 057 throw new IllegalStateException("No processors could be chosen to process " + exchange); 058 } 059 try { 060 processor.process(exchange); 061 } catch (Exception e) { 062 exchange.setException(e); 063 } 064 } 065 066 public void process(Exchange exchange) throws Exception { 067 List<Processor> list = getProcessors(); 068 if (list.isEmpty()) { 069 throw new IllegalStateException("No processors available to process " + exchange); 070 } 071 int index = 0; 072 Processor processor = list.get(index); 073 processExchange(processor, exchange); 074 while (isCheckedException(exchange)) { 075 exchange.setException(null); 076 index++; 077 if (index < list.size()) { 078 processor = list.get(index); 079 processExchange(processor, exchange); 080 } else { 081 break; 082 } 083 } 084 } 085 086 public boolean process(Exchange exchange, final AsyncCallback callback) { 087 return processExchange(0, exchange, callback); 088 } 089 090 public boolean processExchange(final int index, final Exchange exchange, final AsyncCallback callback) { 091 boolean sync; 092 093 List<Processor> list = getProcessors(); 094 if (list.isEmpty()) { 095 throw new IllegalStateException("No processors available to process " + exchange); 096 } 097 098 Processor processor = list.get(index); 099 if (processor == null) { 100 throw new IllegalStateException("No processors could be chosen to process " + exchange); 101 } 102 if (processor instanceof AsyncProcessor) { 103 AsyncProcessor asyncProcessor = (AsyncProcessor) processor; 104 sync = asyncProcessor.process(exchange, new AsyncCallback() { 105 public void done(boolean doSync) { 106 // check the exchange and call the FailOverProcessor 107 if (isCheckedException(exchange) && index < getProcessors().size() - 1) { 108 exchange.setException(null); 109 processExchange(index + 1, exchange, callback); 110 } else { 111 callback.done(doSync); 112 } 113 } 114 }); 115 } else { 116 try { 117 processor.process(exchange); 118 } catch (Exception ex) { 119 exchange.setException(ex); 120 } 121 if (isCheckedException(exchange) && index < getProcessors().size() - 1) { 122 exchange.setException(null); 123 processExchange(index + 1, exchange, callback); 124 } 125 sync = true; 126 callback.done(true); 127 } 128 return sync; 129 } 130 131 }