001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.concurrent.ExecutorService; 020 import java.util.concurrent.Executors; 021 import javax.xml.bind.annotation.XmlAccessType; 022 import javax.xml.bind.annotation.XmlAccessorType; 023 import javax.xml.bind.annotation.XmlAttribute; 024 import javax.xml.bind.annotation.XmlRootElement; 025 import javax.xml.bind.annotation.XmlTransient; 026 027 import org.apache.camel.Expression; 028 import org.apache.camel.Processor; 029 import org.apache.camel.builder.ExpressionClause; 030 import org.apache.camel.model.language.ExpressionDefinition; 031 import org.apache.camel.processor.Splitter; 032 import org.apache.camel.processor.aggregate.AggregationStrategy; 033 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 034 import org.apache.camel.spi.RouteContext; 035 036 /** 037 * Represents an XML <split/> element 038 * 039 * @version $Revision: 772172 $ 040 */ 041 @XmlRootElement(name = "split") 042 @XmlAccessorType(XmlAccessType.FIELD) 043 public class SplitDefinition extends ExpressionNode { 044 @XmlTransient 045 private AggregationStrategy aggregationStrategy; 046 @XmlTransient 047 private ExecutorService executorService; 048 @XmlAttribute(required = false) 049 private Boolean parallelProcessing; 050 @XmlAttribute(required = false) 051 private String strategyRef; 052 @XmlAttribute(required = false) 053 private String executorServiceRef; 054 @XmlAttribute(required = false) 055 private Boolean streaming = false; 056 057 public SplitDefinition() { 058 } 059 060 public SplitDefinition(Expression expression) { 061 super(expression); 062 } 063 064 public SplitDefinition(ExpressionDefinition expression) { 065 super(expression); 066 } 067 068 @Override 069 public String toString() { 070 return "Split[" + getExpression() + " -> " + getOutputs() + "]"; 071 } 072 073 @Override 074 public String getShortName() { 075 return "split"; 076 } 077 078 @Override 079 public Processor createProcessor(RouteContext routeContext) throws Exception { 080 Processor childProcessor = routeContext.createProcessor(this); 081 aggregationStrategy = createAggregationStrategy(routeContext); 082 executorService = createExecutorService(routeContext); 083 return new Splitter(getExpression().createExpression(routeContext), childProcessor, aggregationStrategy, 084 isParallelProcessing(), executorService, streaming); 085 } 086 087 088 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 089 AggregationStrategy strategy = getAggregationStrategy(); 090 if (strategy == null && strategyRef != null) { 091 strategy = routeContext.lookup(strategyRef, AggregationStrategy.class); 092 } 093 if (strategy == null) { 094 // fallback to use latest 095 strategy = new UseLatestAggregationStrategy(); 096 } 097 return strategy; 098 } 099 100 private ExecutorService createExecutorService(RouteContext routeContext) { 101 if (executorServiceRef != null) { 102 executorService = routeContext.lookup(executorServiceRef, ExecutorService.class); 103 } 104 if (executorService == null) { 105 // fall back and use default 106 executorService = Executors.newScheduledThreadPool(5); 107 } 108 return executorService; 109 } 110 111 // Fluent API 112 // ------------------------------------------------------------------------- 113 114 /** 115 * Set the expression that the splitter will use 116 * 117 * @return the builder 118 */ 119 public ExpressionClause<SplitDefinition> expression() { 120 return ExpressionClause.createAndSetExpression(this); 121 } 122 /** 123 * Set the aggregationStrategy 124 * 125 * @return the builder 126 */ 127 public SplitDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 128 setAggregationStrategy(aggregationStrategy); 129 return this; 130 } 131 132 /** 133 * Doing the splitting work in parallel 134 * 135 * @return the builder 136 */ 137 public SplitDefinition parallelProcessing() { 138 setParallelProcessing(true); 139 return this; 140 } 141 142 /** 143 * Set the splitting action's thread model 144 * 145 * @param parallelProcessing <tt>true</tt> to use a thread pool, if <tt>false</tt> then work is done in the 146 * calling thread. 147 * 148 * @return the builder 149 */ 150 public SplitDefinition parallelProcessing(boolean parallelProcessing) { 151 setParallelProcessing(parallelProcessing); 152 return this; 153 } 154 155 /** 156 * Enables streaming. 157 * See {@link SplitDefinition#setStreaming(boolean)} for more information 158 * 159 * @return the builder 160 */ 161 public SplitDefinition streaming() { 162 setStreaming(true); 163 return this; 164 } 165 166 /** 167 * Setting the executor service for executing the splitting action. 168 * 169 * @param executorService the executor service 170 * @return the builder 171 */ 172 public SplitDefinition executorService(ExecutorService executorService) { 173 setExecutorService(executorService); 174 return this; 175 } 176 177 public AggregationStrategy getAggregationStrategy() { 178 return aggregationStrategy; 179 } 180 181 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 182 this.aggregationStrategy = aggregationStrategy; 183 } 184 185 public boolean isParallelProcessing() { 186 return parallelProcessing != null ? parallelProcessing : false; 187 } 188 189 public void setParallelProcessing(boolean parallelProcessing) { 190 this.parallelProcessing = parallelProcessing; 191 } 192 193 /** 194 * The splitter should use streaming -- exchanges are being sent as the data for them becomes available. 195 * This improves throughput and memory usage, but it has a drawback: 196 * - the sent exchanges will no longer contain the {@link org.apache.camel.Exchange#SPLIT_SIZE} header property 197 * 198 * @return whether or not streaming should be used 199 */ 200 public boolean isStreaming() { 201 return streaming != null ? streaming : false; 202 } 203 204 public void setStreaming(boolean streaming) { 205 this.streaming = streaming; 206 } 207 208 public ExecutorService getExecutorService() { 209 return executorService; 210 } 211 212 public void setExecutorService(ExecutorService executorService) { 213 this.executorService = executorService; 214 } 215 }