001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.Callable; 020 import java.util.concurrent.ExecutorService; 021 import java.util.concurrent.Future; 022 023 import org.apache.camel.Exchange; 024 import org.apache.camel.Processor; 025 import org.apache.camel.WaitForTaskToComplete; 026 import org.apache.camel.util.ExchangeHelper; 027 import org.apache.camel.util.concurrent.ExecutorServiceHelper; 028 029 /** 030 * Threads processor that leverage a thread pool for processing exchanges. 031 * <p/> 032 * The original caller thread will receive a <tt>Future<Exchange></tt> in the OUT message body. 033 * It can then later use this handle to obtain the async response. 034 * <p/> 035 * Camel also provides type converters so you can just ask to get the desired object type and Camel 036 * will automatic wait for the async task to complete to return the response. 037 * 038 * @version $Revision: 780262 $ 039 */ 040 public class ThreadsProcessor extends DelegateProcessor implements Processor { 041 042 protected static final int DEFAULT_THREADPOOL_SIZE = 5; 043 protected ExecutorService executorService; 044 protected WaitForTaskToComplete waitForTaskToComplete; 045 046 public ThreadsProcessor(Processor output, ExecutorService executorService, WaitForTaskToComplete waitForTaskToComplete) { 047 super(output); 048 this.executorService = executorService; 049 this.waitForTaskToComplete = waitForTaskToComplete; 050 } 051 052 public void process(final Exchange exchange) throws Exception { 053 final Processor output = getProcessor(); 054 if (output == null) { 055 // no output then return 056 return; 057 } 058 059 // use a new copy of the exchange to route async and handover the on completion to the new copy 060 // so its the new copy that performs the on completion callback when its done 061 final Exchange copy = exchange.newCopy(true); 062 063 // let it execute async and return the Future 064 Callable<Exchange> task = createTask(output, copy); 065 066 // sumbit the task 067 Future<Exchange> future = getExecutorService().submit(task); 068 069 // compute if we should wait for task to complete or not 070 WaitForTaskToComplete wait = waitForTaskToComplete; 071 if (exchange.getIn().getHeader(Exchange.ASYNC_WAIT) != null) { 072 wait = exchange.getIn().getHeader(Exchange.ASYNC_WAIT, WaitForTaskToComplete.class); 073 } 074 075 if (wait == WaitForTaskToComplete.Always) { 076 // wait for task to complete 077 Exchange response = future.get(); 078 ExchangeHelper.copyResults(exchange, response); 079 } else if (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange)) { 080 // wait for task to complete as we expect a reply 081 Exchange response = future.get(); 082 ExchangeHelper.copyResults(exchange, response); 083 } else { 084 // no we do not expect a reply so lets continue, set a handle to the future task 085 // in case end user need it later 086 exchange.getOut().setBody(future); 087 } 088 } 089 090 protected Callable<Exchange> createTask(final Processor output, final Exchange copy) { 091 return new Callable<Exchange>() { 092 public Exchange call() throws Exception { 093 // must use a copy of the original exchange for processing async 094 output.process(copy); 095 return copy; 096 } 097 }; 098 } 099 100 public ExecutorService getExecutorService() { 101 if (executorService == null) { 102 executorService = createExecutorService(); 103 } 104 return executorService; 105 } 106 107 protected ExecutorService createExecutorService() { 108 return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "AsyncProcessor", true); 109 } 110 111 protected void doStop() throws Exception { 112 super.doStop(); 113 if (executorService != null) { 114 executorService.shutdown(); 115 } 116 } 117 118 }