001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.Callable;
020    import java.util.concurrent.ExecutorService;
021    import java.util.concurrent.Future;
022    
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Processor;
025    import org.apache.camel.WaitForTaskToComplete;
026    import org.apache.camel.util.ExchangeHelper;
027    import org.apache.camel.util.concurrent.ExecutorServiceHelper;
028    
029    /**
030     * Threads processor that leverage a thread pool for processing exchanges.
031     * <p/>
032     * The original caller thread will receive a <tt>Future&lt;Exchange&gt;</tt> in the OUT message body.
033     * It can then later use this handle to obtain the async response.
034     * <p/>
035     * Camel also provides type converters so you can just ask to get the desired object type and Camel
036     * will automatic wait for the async task to complete to return the response.
037     *
038     * @version $Revision: 780262 $
039     */
040    public class ThreadsProcessor extends DelegateProcessor implements Processor {
041    
042        protected static final int DEFAULT_THREADPOOL_SIZE = 5;
043        protected ExecutorService executorService;
044        protected WaitForTaskToComplete waitForTaskToComplete;
045    
046        public ThreadsProcessor(Processor output, ExecutorService executorService, WaitForTaskToComplete waitForTaskToComplete) {
047            super(output);
048            this.executorService = executorService;
049            this.waitForTaskToComplete = waitForTaskToComplete;
050        }
051    
052        public void process(final Exchange exchange) throws Exception {
053            final Processor output = getProcessor();
054            if (output == null) {
055                // no output then return
056                return;
057            }
058    
059            // use a new copy of the exchange to route async and handover the on completion to the new copy
060            // so its the new copy that performs the on completion callback when its done
061            final Exchange copy = exchange.newCopy(true);
062    
063            // let it execute async and return the Future
064            Callable<Exchange> task = createTask(output, copy);
065    
066            // sumbit the task
067            Future<Exchange> future = getExecutorService().submit(task);
068    
069            // compute if we should wait for task to complete or not
070            WaitForTaskToComplete wait = waitForTaskToComplete;
071            if (exchange.getIn().getHeader(Exchange.ASYNC_WAIT) != null) {
072                wait = exchange.getIn().getHeader(Exchange.ASYNC_WAIT, WaitForTaskToComplete.class);
073            }
074    
075            if (wait == WaitForTaskToComplete.Always) {
076                // wait for task to complete
077                Exchange response = future.get();
078                ExchangeHelper.copyResults(exchange, response);
079            } else if (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange)) {
080                // wait for task to complete as we expect a reply
081                Exchange response = future.get();
082                ExchangeHelper.copyResults(exchange, response);
083            } else {
084                // no we do not expect a reply so lets continue, set a handle to the future task
085                // in case end user need it later
086                exchange.getOut().setBody(future);
087            }
088        }
089    
090        protected Callable<Exchange> createTask(final Processor output, final Exchange copy) {
091            return new Callable<Exchange>() {
092                public Exchange call() throws Exception {
093                    // must use a copy of the original exchange for processing async
094                    output.process(copy);
095                    return copy;
096                }
097            };
098        }
099    
100        public ExecutorService getExecutorService() {
101            if (executorService == null) {
102                executorService = createExecutorService();
103            }
104            return executorService;
105        }
106    
107        protected ExecutorService createExecutorService() {
108            return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "AsyncProcessor", true);
109        }
110    
111        protected void doStop() throws Exception {
112            super.doStop();
113            if (executorService != null) {
114                executorService.shutdown();
115            }
116        }
117    
118    }