001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.ArrayBlockingQueue;
020    import java.util.concurrent.BlockingQueue;
021    import java.util.concurrent.Executor;
022    import java.util.concurrent.RejectedExecutionException;
023    import java.util.concurrent.ThreadFactory;
024    import java.util.concurrent.ThreadPoolExecutor;
025    import java.util.concurrent.TimeUnit;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    
028    import org.apache.camel.AsyncCallback;
029    import org.apache.camel.AsyncProcessor;
030    import org.apache.camel.Exchange;
031    import org.apache.camel.Service;
032    import org.apache.camel.util.AsyncProcessorHelper;
033    
034    /**
035     * A processor that forces async processing of the exchange using a thread pool.
036     *
037     * @version $Revision: 723041 $
038     */
039    public class ThreadProcessor implements AsyncProcessor, Service {
040    
041        private Executor executor;
042        private long stackSize;
043        private ThreadGroup threadGroup;
044        private int priority = Thread.NORM_PRIORITY;
045        private boolean daemon = true;
046        private String name = "Thread Processor";
047        private BlockingQueue<Runnable> taskQueue;
048        private long keepAliveTime;
049        private int maxSize = 1;
050        private int coreSize = 1;
051        private final AtomicBoolean shutdown = new AtomicBoolean(true);
052        private boolean callerRunsWhenRejected = true;
053    
054        class ProcessCall implements Runnable {
055            private final Exchange exchange;
056            private final AsyncCallback callback;
057    
058            public ProcessCall(Exchange exchange, AsyncCallback callback) {
059                this.exchange = exchange;
060                this.callback = callback;
061            }
062    
063            public void run() {
064                if (shutdown.get()) {
065                    exchange.setException(new RejectedExecutionException());
066                }
067                callback.done(false);
068            }
069        }
070    
071        public void process(Exchange exchange) throws Exception {
072            AsyncProcessorHelper.process(this, exchange);
073        }
074    
075        public boolean process(final Exchange exchange, final AsyncCallback callback) {
076            if (shutdown.get()) {
077                throw new IllegalStateException("ThreadProcessor is not running.");
078            }
079            ProcessCall call = new ProcessCall(exchange, callback);
080            try {
081                executor.execute(call);
082                return false;
083            } catch (RejectedExecutionException e) {
084                if (callerRunsWhenRejected) {
085                    if (shutdown.get()) {
086                        exchange.setException(new RejectedExecutionException());
087                    } else {
088                        callback.done(true);
089                    }
090                } else {
091                    exchange.setException(e);
092                }
093                return true;
094            }
095        }
096    
097        public void start() throws Exception {
098            shutdown.set(false);
099            getExecutor();
100        }
101    
102        public void stop() throws Exception {
103            shutdown.set(true);
104            if (executor instanceof ThreadPoolExecutor) {
105                ((ThreadPoolExecutor)executor).shutdown();
106                ((ThreadPoolExecutor)executor).awaitTermination(0, TimeUnit.SECONDS);
107            }
108        }
109    
110        public long getStackSize() {
111            return stackSize;
112        }
113    
114        public void setStackSize(long stackSize) {
115            this.stackSize = stackSize;
116        }
117    
118        public ThreadGroup getThreadGroup() {
119            return threadGroup;
120        }
121    
122        public void setThreadGroup(ThreadGroup threadGroup) {
123            this.threadGroup = threadGroup;
124        }
125    
126        public int getPriority() {
127            return priority;
128        }
129    
130        public void setPriority(int priority) {
131            this.priority = priority;
132        }
133    
134        public boolean isDaemon() {
135            return daemon;
136        }
137    
138        public void setDaemon(boolean daemon) {
139            this.daemon = daemon;
140        }
141    
142        public String getName() {
143            return name;
144        }
145    
146        public void setName(String name) {
147            this.name = name;
148        }
149    
150        public long getKeepAliveTime() {
151            return keepAliveTime;
152        }
153    
154        public void setKeepAliveTime(long keepAliveTime) {
155            this.keepAliveTime = keepAliveTime;
156        }
157    
158        public int getMaxSize() {
159            return maxSize;
160        }
161    
162        public void setMaxSize(int maxSize) {
163            this.maxSize = maxSize;
164        }
165    
166        public int getCoreSize() {
167            return coreSize;
168        }
169    
170        public void setCoreSize(int coreSize) {
171            this.coreSize = coreSize;
172        }
173    
174        public BlockingQueue<Runnable> getTaskQueue() {
175            if (taskQueue == null) {
176                taskQueue = new ArrayBlockingQueue<Runnable>(1000);
177            }
178            return taskQueue;
179        }
180    
181        public void setTaskQueue(BlockingQueue<Runnable> taskQueue) {
182            this.taskQueue = taskQueue;
183        }
184    
185        public Executor getExecutor() {
186            if (executor == null) {
187                executor = new ThreadPoolExecutor(getCoreSize(), getMaxSize(), getKeepAliveTime(), TimeUnit.MILLISECONDS, getTaskQueue(), new ThreadFactory() {
188                    public Thread newThread(Runnable runnable) {
189                        Thread thread;
190                        if (getStackSize() > 0) {
191                            thread = new Thread(getThreadGroup(), runnable, getName(), getStackSize());
192                        } else {
193                            thread = new Thread(getThreadGroup(), runnable, getName());
194                        }
195                        thread.setDaemon(isDaemon());
196                        thread.setPriority(getPriority());
197                        return thread;
198                    }
199                });
200            }
201            return executor;
202        }
203    
204        public void setExecutor(Executor executor) {
205            this.executor = executor;
206        }
207    
208        public boolean isCallerRunsWhenRejected() {
209            return callerRunsWhenRejected;
210        }
211    
212        public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) {
213            this.callerRunsWhenRejected = callerRunsWhenRejected;
214        }
215    
216    }