001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.Callable; 020 import java.util.concurrent.ExecutorService; 021 022 import org.apache.camel.Exchange; 023 import org.apache.camel.ExchangePattern; 024 import org.apache.camel.Predicate; 025 import org.apache.camel.Processor; 026 import org.apache.camel.impl.ServiceSupport; 027 import org.apache.camel.impl.SynchronizationAdapter; 028 import org.apache.camel.util.ServiceHelper; 029 import org.apache.camel.util.concurrent.ExecutorServiceHelper; 030 import org.apache.commons.logging.Log; 031 import org.apache.commons.logging.LogFactory; 032 033 /** 034 * @version $Revision: 777808 $ 035 */ 036 public class OnCompletionProcessor extends ServiceSupport implements Processor { 037 038 private static final transient Log LOG = LogFactory.getLog(OnCompletionProcessor.class); 039 private ExecutorService executorService; 040 private Processor processor; 041 private boolean onComplete; 042 private boolean onFailure; 043 private Predicate onWhen; 044 045 public OnCompletionProcessor(Processor processor, boolean onComplete, boolean onFailure, Predicate onWhen) { 046 this.processor = processor; 047 this.onComplete = onComplete; 048 this.onFailure = onFailure; 049 this.onWhen = onWhen; 050 } 051 052 protected void doStart() throws Exception { 053 ServiceHelper.startService(processor); 054 } 055 056 @Override 057 protected void doStop() throws Exception { 058 if (executorService != null) { 059 executorService.shutdown(); 060 } 061 ServiceHelper.stopService(processor); 062 } 063 064 public void process(Exchange exchange) throws Exception { 065 if (processor == null) { 066 return; 067 } 068 069 if (!onComplete && !onFailure) { 070 // no need to register callbacks not to be used 071 return; 072 } 073 074 // register callback 075 exchange.getUnitOfWork().addSynchronization(new SynchronizationAdapter() { 076 @Override 077 public void onComplete(Exchange exchange) { 078 if (!onComplete) { 079 return; 080 } 081 082 if (onWhen != null && !onWhen.matches(exchange)) { 083 // predicate did not match so do not route the onComplete 084 return; 085 } 086 087 // must use a copy as we dont want it to cause side effects of the original exchange 088 final Exchange copy = prepareExchange(exchange); 089 090 getExecutorService().submit(new Callable<Exchange>() { 091 public Exchange call() throws Exception { 092 if (LOG.isDebugEnabled()) { 093 LOG.debug("Processing onComplete: " + copy); 094 } 095 processor.process(copy); 096 return copy; 097 } 098 }); 099 } 100 101 public void onFailure(Exchange exchange) { 102 if (!onFailure) { 103 return; 104 } 105 106 if (onWhen != null && !onWhen.matches(exchange)) { 107 // predicate did not match so do not route the onComplete 108 return; 109 } 110 111 // must use a copy as we dont want it to cause side effects of the original exchange 112 final Exchange copy = prepareExchange(exchange); 113 // must remove exception otherwise onFaulure routing will fail as well 114 // the caused exception is stored as a property (Exchange.EXCEPTION_CAUGHT) on the exchange 115 copy.setException(null); 116 117 getExecutorService().submit(new Callable<Exchange>() { 118 public Exchange call() throws Exception { 119 if (LOG.isDebugEnabled()) { 120 LOG.debug("Processing onFailure: " + copy); 121 } 122 123 processor.process(copy); 124 return copy; 125 } 126 }); 127 } 128 129 @Override 130 public String toString() { 131 if (onComplete && onFailure) { 132 return "onCompleteOrFailure"; 133 } else if (onComplete) { 134 return "onCompleteOnly"; 135 } else { 136 return "onFailureOnly"; 137 } 138 } 139 }); 140 } 141 142 /** 143 * Prepares the {@link Exchange} to send as onCompletion. 144 * 145 * @param exchange the current exchange 146 * @return the exchange to be routed in onComplete 147 */ 148 protected Exchange prepareExchange(Exchange exchange) { 149 // must use a copy as we dont want it to cause side effects of the original exchange 150 final Exchange copy = exchange.newCopy(false); 151 // set MEP to InOnly as this wire tap is a fire and forget 152 copy.setPattern(ExchangePattern.InOnly); 153 // add a header flag to indicate its a on completion exchange 154 copy.setProperty(Exchange.ON_COMPLETION, Boolean.TRUE); 155 return copy; 156 } 157 158 public ExecutorService getExecutorService() { 159 if (executorService == null) { 160 executorService = createExecutorService(); 161 } 162 return executorService; 163 } 164 165 private ExecutorService createExecutorService() { 166 return ExecutorServiceHelper.newScheduledThreadPool(5, this.toString(), true); 167 } 168 169 public void setExecutorService(ExecutorService executorService) { 170 this.executorService = executorService; 171 } 172 173 @Override 174 public String toString() { 175 return "OnCompletionProcessor"; 176 } 177 }