001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.concurrent.Executor; 020 import java.util.concurrent.LinkedBlockingQueue; 021 import java.util.concurrent.ThreadPoolExecutor; 022 import java.util.concurrent.TimeUnit; 023 024 import javax.xml.bind.annotation.XmlAccessType; 025 import javax.xml.bind.annotation.XmlAccessorType; 026 import javax.xml.bind.annotation.XmlAttribute; 027 import javax.xml.bind.annotation.XmlRootElement; 028 import javax.xml.bind.annotation.XmlTransient; 029 030 import org.apache.camel.Expression; 031 import org.apache.camel.Processor; 032 import org.apache.camel.builder.ExpressionClause; 033 import org.apache.camel.model.language.ExpressionDefinition; 034 import org.apache.camel.processor.Splitter; 035 import org.apache.camel.processor.aggregate.AggregationStrategy; 036 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 037 import org.apache.camel.spi.RouteContext; 038 039 /** 040 * Represents an XML <split/> element 041 * 042 * @version $Revision: 751373 $ 043 */ 044 @XmlRootElement(name = "split") 045 @XmlAccessorType(XmlAccessType.FIELD) 046 public class SplitDefinition extends ExpressionNode { 047 @XmlTransient 048 private AggregationStrategy aggregationStrategy; 049 @XmlAttribute(required = false) 050 private Boolean parallelProcessing; 051 @XmlTransient 052 private Executor executor; 053 @XmlAttribute(required = false) 054 private String strategyRef; 055 @XmlAttribute(required = false) 056 private String threadPoolExecutorRef; 057 @XmlAttribute(required = false) 058 private Boolean streaming = false; 059 060 public SplitDefinition() { 061 } 062 063 public SplitDefinition(Expression expression) { 064 super(expression); 065 } 066 067 public SplitDefinition(ExpressionDefinition expression) { 068 super(expression); 069 } 070 071 @Override 072 public String toString() { 073 return "Split[" + getExpression() + " -> " + getOutputs() + "]"; 074 } 075 076 @Override 077 public String getShortName() { 078 return "split"; 079 } 080 081 @Override 082 public Processor createProcessor(RouteContext routeContext) throws Exception { 083 Processor childProcessor = routeContext.createProcessor(this); 084 aggregationStrategy = createAggregationStrategy(routeContext); 085 executor = createThreadPoolExecutor(routeContext); 086 return new Splitter(getExpression().createExpression(routeContext), childProcessor, aggregationStrategy, 087 isParallelProcessing(), executor, streaming); 088 } 089 090 091 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 092 AggregationStrategy strategy = getAggregationStrategy(); 093 if (strategy == null && strategyRef != null) { 094 strategy = routeContext.lookup(strategyRef, AggregationStrategy.class); 095 } 096 if (strategy == null) { 097 // fallback to use latest 098 strategy = new UseLatestAggregationStrategy(); 099 } 100 return strategy; 101 } 102 103 private Executor createThreadPoolExecutor(RouteContext routeContext) { 104 Executor executor = getExecutor(); 105 if (executor == null && threadPoolExecutorRef != null) { 106 executor = routeContext.lookup(threadPoolExecutorRef, ThreadPoolExecutor.class); 107 } 108 if (executor == null) { 109 // fall back and use default 110 executor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); 111 } 112 return executor; 113 } 114 115 // Fluent API 116 // ------------------------------------------------------------------------- 117 118 /** 119 * Set the expression that the splitter will use 120 * 121 * @return the builder 122 */ 123 public ExpressionClause<SplitDefinition> expression() { 124 return ExpressionClause.createAndSetExpression(this); 125 } 126 /** 127 * Set the aggregationStrategy 128 * 129 * @return the builder 130 */ 131 public SplitDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 132 setAggregationStrategy(aggregationStrategy); 133 return this; 134 } 135 136 /** 137 * Doing the splitting work in parallel 138 * 139 * @return the builder 140 */ 141 public SplitDefinition parallelProcessing() { 142 setParallelProcessing(true); 143 return this; 144 } 145 146 /** 147 * Set the splitting action's thread model 148 * 149 * @param parallelProcessing <tt>true</tt> to use a thread pool, if <tt>false</tt> then work is done in the 150 * calling thread. 151 * 152 * @return the builder 153 */ 154 public SplitDefinition parallelProcessing(boolean parallelProcessing) { 155 setParallelProcessing(parallelProcessing); 156 return this; 157 } 158 159 /** 160 * Enables streaming. 161 * See {@link SplitDefinition#setStreaming(boolean)} for more information 162 * 163 * @return the builder 164 */ 165 public SplitDefinition streaming() { 166 setStreaming(true); 167 return this; 168 } 169 170 /** 171 * Setting the executor for executing the splitting action. 172 * 173 * @param executor the executor 174 * @return the builder 175 */ 176 public SplitDefinition executor(Executor executor) { 177 setExecutor(executor); 178 return this; 179 } 180 181 public AggregationStrategy getAggregationStrategy() { 182 return aggregationStrategy; 183 } 184 185 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 186 this.aggregationStrategy = aggregationStrategy; 187 } 188 189 public boolean isParallelProcessing() { 190 return parallelProcessing != null ? parallelProcessing : false; 191 } 192 193 public void setParallelProcessing(boolean parallelProcessing) { 194 this.parallelProcessing = parallelProcessing; 195 } 196 197 /** 198 * The splitter should use streaming -- exchanges are being sent as the data for them becomes available. 199 * This improves throughput and memory usage, but it has a drawback: 200 * - the sent exchanges will no longer contain the {@link Splitter#SPLIT_SIZE} header property 201 * 202 * @return whether or not streaming should be used 203 */ 204 public boolean isStreaming() { 205 return streaming != null ? streaming : false; 206 } 207 208 public void setStreaming(boolean streaming) { 209 this.streaming = streaming; 210 } 211 212 public Executor getExecutor() { 213 return executor; 214 } 215 216 public void setExecutor(Executor executor) { 217 this.executor = executor; 218 } 219 }