001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 import java.util.concurrent.BlockingQueue; 022 import java.util.concurrent.Executor; 023 024 import javax.xml.bind.annotation.XmlAccessType; 025 import javax.xml.bind.annotation.XmlAccessorType; 026 import javax.xml.bind.annotation.XmlAttribute; 027 import javax.xml.bind.annotation.XmlElementRef; 028 import javax.xml.bind.annotation.XmlRootElement; 029 import javax.xml.bind.annotation.XmlTransient; 030 031 import org.apache.camel.Processor; 032 import org.apache.camel.builder.ErrorHandlerBuilder; 033 import org.apache.camel.processor.Pipeline; 034 import org.apache.camel.processor.ThreadProcessor; 035 import org.apache.camel.spi.RouteContext; 036 037 /** 038 * Represents an XML <thread/> element 039 * 040 * @version $Revision: 750806 $ 041 */ 042 @XmlRootElement(name = "thread") 043 @XmlAccessorType(XmlAccessType.FIELD) 044 public class ThreadDefinition extends ProcessorDefinition<ProcessorDefinition> { 045 @XmlAttribute(required = false) 046 private Integer coreSize = 1; 047 @XmlAttribute(required = false) 048 private Boolean daemon = Boolean.TRUE; 049 @XmlAttribute(required = false) 050 private Long keepAliveTime; 051 @XmlAttribute(required = false) 052 private Integer maxSize = 1; 053 @XmlAttribute(required = false) 054 private String name = "Thread Processor"; 055 @XmlAttribute(required = false) 056 private Integer priority = Thread.NORM_PRIORITY; 057 @XmlAttribute(required = false) 058 private Long stackSize; 059 @XmlElementRef 060 private List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>(); 061 @XmlTransient 062 private BlockingQueue<Runnable> taskQueue; 063 @XmlTransient 064 private ThreadGroup threadGroup; 065 @XmlTransient 066 private Executor executor; 067 068 public ThreadDefinition() { 069 } 070 071 public ThreadDefinition(int coreSize) { 072 this.coreSize = coreSize; 073 this.maxSize = coreSize; 074 } 075 076 public ThreadDefinition(Executor executor) { 077 this.executor = executor; 078 } 079 080 @Override 081 public List<ProcessorDefinition> getOutputs() { 082 return outputs; 083 } 084 085 @Override 086 public String toString() { 087 return "Thread[" + name + "]"; 088 } 089 090 @Override 091 public String getShortName() { 092 return "thread"; 093 } 094 095 @Override 096 public Processor createProcessor(RouteContext routeContext) throws Exception { 097 ThreadProcessor thread = new ThreadProcessor(); 098 thread.setExecutor(executor); 099 if (coreSize != null) { 100 thread.setCoreSize(coreSize); 101 } 102 if (daemon != null) { 103 thread.setDaemon(daemon); 104 } 105 if (keepAliveTime != null) { 106 thread.setKeepAliveTime(keepAliveTime); 107 } 108 if (maxSize != null) { 109 thread.setMaxSize(maxSize); 110 } 111 thread.setName(name); 112 thread.setPriority(priority); 113 if (stackSize != null) { 114 thread.setStackSize(stackSize); 115 } 116 thread.setTaskQueue(taskQueue); 117 thread.setThreadGroup(threadGroup); 118 119 // TODO: see if we can avoid creating so many nested pipelines 120 ArrayList<Processor> pipe = new ArrayList<Processor>(2); 121 pipe.add(thread); 122 pipe.add(createOutputsProcessor(routeContext, outputs)); 123 return new Pipeline(pipe); 124 } 125 126 @Override 127 protected void configureChild(ProcessorDefinition output) { 128 super.configureChild(output); 129 if (isInheritErrorHandler()) { 130 output.setErrorHandlerBuilder(getErrorHandlerBuilder()); 131 } 132 } 133 134 // Fluent methods 135 // ----------------------------------------------------------------------- 136 @Override 137 public ProcessorDefinition errorHandler(ErrorHandlerBuilder errorHandlerBuilder) { 138 // do not support setting error handling on thread type as its confusing and will not be used 139 throw new IllegalArgumentException("Setting errorHandler on ThreadType is not supported." 140 + " Instead set the errorHandler on the parent."); 141 } 142 143 /** 144 * Sets the core size 145 * 146 * @param coreSize the core size 147 * @return the builder 148 */ 149 public ThreadDefinition coreSize(int coreSize) { 150 setCoreSize(coreSize); 151 return this; 152 } 153 154 /** 155 * Sets the deamon option 156 * 157 * @param daemon deamon option 158 * @return the builder 159 */ 160 public ThreadDefinition daemon(boolean daemon) { 161 setDaemon(daemon); 162 return this; 163 } 164 165 /** 166 * Sets the keep alive time 167 * 168 * @param keepAliveTime keep alive time in millis 169 * @return the builder 170 */ 171 public ThreadDefinition keepAliveTime(long keepAliveTime) { 172 setKeepAliveTime(keepAliveTime); 173 return this; 174 } 175 176 /** 177 * Sets the max pool size 178 * 179 * @param maxSize the max pool size 180 * @return the builder 181 */ 182 public ThreadDefinition maxSize(int maxSize) { 183 setMaxSize(maxSize); 184 return this; 185 } 186 187 /** 188 * Sets the thread pool name 189 * 190 * @param name the name 191 * @return the builder 192 */ 193 public ThreadDefinition name(String name) { 194 setName(name); 195 return this; 196 } 197 198 /** 199 * Sets the thread pool priority 200 * 201 * @param priority the priority 202 * @return the builder 203 */ 204 public ThreadDefinition priority(int priority) { 205 setPriority(priority); 206 return this; 207 } 208 209 /** 210 * Sets the thread pool stack size 211 * 212 * @param stackSize the stack size 213 * @return the builder 214 */ 215 public ThreadDefinition stackSize(long stackSize) { 216 setStackSize(stackSize); 217 return this; 218 } 219 220 /** 221 * Sets the task queue 222 * 223 * @param taskQueue the task queue 224 * @return the builder 225 */ 226 public ThreadDefinition taskQueue(BlockingQueue<Runnable> taskQueue) { 227 setTaskQueue(taskQueue); 228 return this; 229 } 230 231 /** 232 * Sets the thread group 233 * 234 * @param threadGroup the thread group 235 * @return the builder 236 */ 237 public ThreadDefinition threadGroup(ThreadGroup threadGroup) { 238 setThreadGroup(threadGroup); 239 return this; 240 } 241 242 /** 243 * Sets the execute to use 244 * 245 * @param executor the executor 246 * @return the builder 247 */ 248 public ThreadDefinition executor(Executor executor) { 249 setExecutor(executor); 250 return this; 251 } 252 253 // Property Accessors 254 // ----------------------------------------------------------------------- 255 256 public void setCoreSize(int coreSize) { 257 this.coreSize = coreSize; 258 } 259 260 public void setDaemon(boolean daemon) { 261 this.daemon = daemon; 262 } 263 264 public void setKeepAliveTime(long keepAliveTime) { 265 this.keepAliveTime = keepAliveTime; 266 } 267 268 public void setMaxSize(int maxSize) { 269 this.maxSize = maxSize; 270 } 271 272 public void setName(String name) { 273 this.name = name; 274 } 275 276 public void setPriority(int priority) { 277 this.priority = priority; 278 } 279 280 public void setStackSize(long stackSize) { 281 this.stackSize = stackSize; 282 } 283 284 public void setTaskQueue(BlockingQueue<Runnable> taskQueue) { 285 this.taskQueue = taskQueue; 286 } 287 288 public void setThreadGroup(ThreadGroup threadGroup) { 289 this.threadGroup = threadGroup; 290 } 291 292 public Executor getExecutor() { 293 return executor; 294 } 295 296 public void setExecutor(Executor executor) { 297 this.executor = executor; 298 } 299 }