001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.Callable; 020 import java.util.concurrent.ExecutorService; 021 022 import org.apache.camel.Exchange; 023 import org.apache.camel.ExchangePattern; 024 import org.apache.camel.Predicate; 025 import org.apache.camel.Processor; 026 import org.apache.camel.impl.ServiceSupport; 027 import org.apache.camel.impl.SynchronizationAdapter; 028 import org.apache.camel.util.ServiceHelper; 029 import org.apache.camel.util.concurrent.ExecutorServiceHelper; 030 import org.apache.commons.logging.Log; 031 import org.apache.commons.logging.LogFactory; 032 033 /** 034 * @version $Revision: 792977 $ 035 */ 036 public class OnCompletionProcessor extends ServiceSupport implements Processor, Traceable { 037 038 private static final transient Log LOG = LogFactory.getLog(OnCompletionProcessor.class); 039 private ExecutorService executorService; 040 private Processor processor; 041 private boolean onCompleteOnly; 042 private boolean onFailureOnly; 043 private Predicate onWhen; 044 045 public OnCompletionProcessor(Processor processor, boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen) { 046 this.processor = processor; 047 this.onCompleteOnly = onCompleteOnly; 048 this.onFailureOnly = onFailureOnly; 049 this.onWhen = onWhen; 050 } 051 052 protected void doStart() throws Exception { 053 ServiceHelper.startService(processor); 054 } 055 056 @Override 057 protected void doStop() throws Exception { 058 if (executorService != null) { 059 executorService.shutdown(); 060 } 061 ServiceHelper.stopService(processor); 062 } 063 064 public void process(Exchange exchange) throws Exception { 065 if (processor == null) { 066 return; 067 } 068 069 // register callback 070 exchange.getUnitOfWork().addSynchronization(new SynchronizationAdapter() { 071 @Override 072 public void onComplete(Exchange exchange) { 073 if (onFailureOnly) { 074 return; 075 } 076 077 if (onWhen != null && !onWhen.matches(exchange)) { 078 // predicate did not match so do not route the onComplete 079 return; 080 } 081 082 // must use a copy as we dont want it to cause side effects of the original exchange 083 final Exchange copy = prepareExchange(exchange); 084 085 getExecutorService().submit(new Callable<Exchange>() { 086 public Exchange call() throws Exception { 087 if (LOG.isDebugEnabled()) { 088 LOG.debug("Processing onComplete: " + copy); 089 } 090 processor.process(copy); 091 return copy; 092 } 093 }); 094 } 095 096 public void onFailure(Exchange exchange) { 097 if (onCompleteOnly) { 098 return; 099 } 100 101 if (onWhen != null && !onWhen.matches(exchange)) { 102 // predicate did not match so do not route the onComplete 103 return; 104 } 105 106 // must use a copy as we dont want it to cause side effects of the original exchange 107 final Exchange copy = prepareExchange(exchange); 108 // must remove exception otherwise onFaulure routing will fail as well 109 // the caused exception is stored as a property (Exchange.EXCEPTION_CAUGHT) on the exchange 110 copy.setException(null); 111 112 getExecutorService().submit(new Callable<Exchange>() { 113 public Exchange call() throws Exception { 114 if (LOG.isDebugEnabled()) { 115 LOG.debug("Processing onFailure: " + copy); 116 } 117 118 processor.process(copy); 119 return copy; 120 } 121 }); 122 } 123 124 @Override 125 public String toString() { 126 if (!onCompleteOnly && !onFailureOnly) { 127 return "onCompleteOrFailure"; 128 } else if (onCompleteOnly) { 129 return "onCompleteOnly"; 130 } else { 131 return "onFailureOnly"; 132 } 133 } 134 }); 135 } 136 137 /** 138 * Prepares the {@link Exchange} to send as onCompletion. 139 * 140 * @param exchange the current exchange 141 * @return the exchange to be routed in onComplete 142 */ 143 protected Exchange prepareExchange(Exchange exchange) { 144 // must use a copy as we dont want it to cause side effects of the original exchange 145 final Exchange copy = exchange.copy(false); 146 // set MEP to InOnly as this wire tap is a fire and forget 147 copy.setPattern(ExchangePattern.InOnly); 148 // add a header flag to indicate its a on completion exchange 149 copy.setProperty(Exchange.ON_COMPLETION, Boolean.TRUE); 150 return copy; 151 } 152 153 public ExecutorService getExecutorService() { 154 if (executorService == null) { 155 executorService = createExecutorService(); 156 } 157 return executorService; 158 } 159 160 private ExecutorService createExecutorService() { 161 return ExecutorServiceHelper.newScheduledThreadPool(5, this.toString(), true); 162 } 163 164 public void setExecutorService(ExecutorService executorService) { 165 this.executorService = executorService; 166 } 167 168 @Override 169 public String toString() { 170 return "OnCompletionProcessor[" + processor + "]"; 171 } 172 173 public String getTraceLabel() { 174 return "OnCompletion"; 175 } 176 }