001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.Iterator; 022 import java.util.LinkedList; 023 import java.util.List; 024 import java.util.Queue; 025 import java.util.concurrent.TimeUnit; 026 import java.util.concurrent.locks.Condition; 027 import java.util.concurrent.locks.Lock; 028 import java.util.concurrent.locks.ReentrantLock; 029 030 import org.apache.camel.Exchange; 031 import org.apache.camel.Navigate; 032 import org.apache.camel.Processor; 033 import org.apache.camel.impl.DefaultExchange; 034 import org.apache.camel.impl.LoggingExceptionHandler; 035 import org.apache.camel.impl.ServiceSupport; 036 import org.apache.camel.spi.ExceptionHandler; 037 import org.apache.camel.util.ObjectHelper; 038 import org.apache.camel.util.ServiceHelper; 039 import org.apache.camel.util.concurrent.ExecutorServiceHelper; 040 import org.apache.commons.logging.Log; 041 import org.apache.commons.logging.LogFactory; 042 043 /** 044 * A base class for any kind of {@link Processor} which implements some kind of batch processing. 045 * 046 * @version $Revision: 791088 $ 047 */ 048 public class BatchProcessor extends ServiceSupport implements Processor, Navigate<Processor> { 049 050 public static final long DEFAULT_BATCH_TIMEOUT = 1000L; 051 public static final int DEFAULT_BATCH_SIZE = 100; 052 053 private static final Log LOG = LogFactory.getLog(BatchProcessor.class); 054 055 private long batchTimeout = DEFAULT_BATCH_TIMEOUT; 056 private int batchSize = DEFAULT_BATCH_SIZE; 057 private int outBatchSize; 058 private boolean groupExchanges; 059 private boolean batchConsumer; 060 061 private final Processor processor; 062 private final Collection<Exchange> collection; 063 private ExceptionHandler exceptionHandler; 064 065 private final BatchSender sender; 066 067 public BatchProcessor(Processor processor, Collection<Exchange> collection) { 068 ObjectHelper.notNull(processor, "processor"); 069 ObjectHelper.notNull(collection, "collection"); 070 this.processor = processor; 071 this.collection = collection; 072 this.sender = new BatchSender(); 073 } 074 075 @Override 076 public String toString() { 077 return "BatchProcessor[to: " + processor + "]"; 078 } 079 080 // Properties 081 // ------------------------------------------------------------------------- 082 public ExceptionHandler getExceptionHandler() { 083 if (exceptionHandler == null) { 084 exceptionHandler = new LoggingExceptionHandler(getClass()); 085 } 086 return exceptionHandler; 087 } 088 089 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 090 this.exceptionHandler = exceptionHandler; 091 } 092 093 public int getBatchSize() { 094 return batchSize; 095 } 096 097 /** 098 * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor will 099 * process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}. 100 * 101 * @param batchSize the size 102 */ 103 public void setBatchSize(int batchSize) { 104 // setting batch size to 0 or negative is like disabling it, so we set it as the max value 105 // as the code logic is dependt on a batch size having 1..n value 106 if (batchSize <= 0) { 107 LOG.debug("Disabling batch size, will only be triggered by timeout"); 108 this.batchSize = Integer.MAX_VALUE; 109 } else { 110 this.batchSize = batchSize; 111 } 112 } 113 114 public int getOutBatchSize() { 115 return outBatchSize; 116 } 117 118 /** 119 * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then the 120 * completion is triggered. Can for instance be used to ensure that this batch is completed when a certain 121 * number of exchanges has been collected. By default this feature is <b>not</b> enabled. 122 * 123 * @param outBatchSize the size 124 */ 125 public void setOutBatchSize(int outBatchSize) { 126 this.outBatchSize = outBatchSize; 127 } 128 129 public long getBatchTimeout() { 130 return batchTimeout; 131 } 132 133 public void setBatchTimeout(long batchTimeout) { 134 this.batchTimeout = batchTimeout; 135 } 136 137 public boolean isGroupExchanges() { 138 return groupExchanges; 139 } 140 141 public void setGroupExchanges(boolean groupExchanges) { 142 this.groupExchanges = groupExchanges; 143 } 144 145 public boolean isBatchConsumer() { 146 return batchConsumer; 147 } 148 149 public void setBatchConsumer(boolean batchConsumer) { 150 this.batchConsumer = batchConsumer; 151 } 152 153 public Processor getProcessor() { 154 return processor; 155 } 156 157 public List<Processor> next() { 158 if (!hasNext()) { 159 return null; 160 } 161 List<Processor> answer = new ArrayList<Processor>(1); 162 answer.add(processor); 163 return answer; 164 } 165 166 public boolean hasNext() { 167 return processor != null; 168 } 169 170 /** 171 * A strategy method to decide if the "in" batch is completed. That is, whether the resulting exchanges in 172 * the in queue should be drained to the "out" collection. 173 */ 174 private boolean isInBatchCompleted(int num) { 175 return num >= batchSize; 176 } 177 178 /** 179 * A strategy method to decide if the "out" batch is completed. That is, whether the resulting exchange in 180 * the out collection should be sent. 181 */ 182 private boolean isOutBatchCompleted() { 183 if (outBatchSize == 0) { 184 // out batch is disabled, so go ahead and send. 185 return true; 186 } 187 return collection.size() > 0 && collection.size() >= outBatchSize; 188 } 189 190 /** 191 * Strategy Method to process an exchange in the batch. This method allows derived classes to perform 192 * custom processing before or after an individual exchange is processed 193 */ 194 protected void processExchange(Exchange exchange) throws Exception { 195 processor.process(exchange); 196 } 197 198 protected void doStart() throws Exception { 199 ServiceHelper.startServices(processor); 200 sender.start(); 201 } 202 203 protected void doStop() throws Exception { 204 sender.cancel(); 205 ServiceHelper.stopServices(sender); 206 ServiceHelper.stopServices(processor); 207 collection.clear(); 208 } 209 210 /** 211 * Enqueues an exchange for later batch processing. 212 */ 213 public void process(Exchange exchange) throws Exception { 214 215 // if batch consumer is enabled then we need to adjust the batch size 216 // with the size from the batch consumer 217 if (isBatchConsumer()) { 218 int size = exchange.getProperty(Exchange.BATCH_SIZE, Integer.class); 219 if (batchSize != size) { 220 batchSize = size; 221 if (LOG.isTraceEnabled()) { 222 LOG.trace("Using batch consumer completion, so setting batch size to: " + batchSize); 223 } 224 } 225 } 226 227 sender.enqueueExchange(exchange); 228 } 229 230 /** 231 * Sender thread for queued-up exchanges. 232 */ 233 private class BatchSender extends Thread { 234 235 private Queue<Exchange> queue; 236 private Lock queueLock = new ReentrantLock(); 237 private boolean exchangeEnqueued; 238 private Condition exchangeEnqueuedCondition = queueLock.newCondition(); 239 240 public BatchSender() { 241 super(ExecutorServiceHelper.getThreadName("Batch Sender")); 242 this.queue = new LinkedList<Exchange>(); 243 } 244 245 @Override 246 public void run() { 247 // Wait until one of either: 248 // * an exchange being queued; 249 // * the batch timeout expiring; or 250 // * the thread being cancelled. 251 // 252 // If an exchange is queued then we need to determine whether the 253 // batch is complete. If it is complete then we send out the batched 254 // exchanges. Otherwise we move back into our wait state. 255 // 256 // If the batch times out then we send out the batched exchanges 257 // collected so far. 258 // 259 // If we receive an interrupt then all blocking operations are 260 // interrupted and our thread terminates. 261 // 262 // The goal of the following algorithm in terms of synchronisation 263 // is to provide fine grained locking i.e. retaining the lock only 264 // when required. Special consideration is given to releasing the 265 // lock when calling an overloaded method i.e. sendExchanges. 266 // Unlocking is important as the process of sending out the exchanges 267 // would otherwise block new exchanges from being queued. 268 269 queueLock.lock(); 270 try { 271 do { 272 try { 273 if (!exchangeEnqueued) { 274 exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS); 275 } 276 277 if (!exchangeEnqueued) { 278 drainQueueTo(collection, batchSize); 279 } else { 280 exchangeEnqueued = false; 281 while (isInBatchCompleted(queue.size())) { 282 drainQueueTo(collection, batchSize); 283 } 284 285 if (!isOutBatchCompleted()) { 286 continue; 287 } 288 } 289 290 queueLock.unlock(); 291 try { 292 try { 293 sendExchanges(); 294 } catch (Exception e) { 295 getExceptionHandler().handleException(e); 296 } 297 } finally { 298 queueLock.lock(); 299 } 300 301 } catch (InterruptedException e) { 302 break; 303 } 304 305 } while (isRunAllowed()); 306 307 } finally { 308 queueLock.unlock(); 309 } 310 } 311 312 /** 313 * This method should be called with queueLock held 314 */ 315 private void drainQueueTo(Collection<Exchange> collection, int batchSize) { 316 for (int i = 0; i < batchSize; ++i) { 317 Exchange e = queue.poll(); 318 if (e != null) { 319 collection.add(e); 320 } else { 321 break; 322 } 323 } 324 } 325 326 public void cancel() { 327 interrupt(); 328 } 329 330 public void enqueueExchange(Exchange exchange) { 331 queueLock.lock(); 332 try { 333 queue.add(exchange); 334 exchangeEnqueued = true; 335 exchangeEnqueuedCondition.signal(); 336 } finally { 337 queueLock.unlock(); 338 } 339 } 340 341 @SuppressWarnings("unchecked") 342 private void sendExchanges() throws Exception { 343 Exchange grouped = null; 344 345 Iterator<Exchange> iter = collection.iterator(); 346 while (iter.hasNext()) { 347 Exchange exchange = iter.next(); 348 iter.remove(); 349 if (!groupExchanges) { 350 // non grouped so process the exchange one at a time 351 processExchange(exchange); 352 } else { 353 // grouped so add all exchanges into one group 354 if (grouped == null) { 355 grouped = new DefaultExchange(exchange); 356 } 357 List<Exchange> list = grouped.getProperty(Exchange.GROUPED_EXCHANGE, List.class); 358 if (list == null) { 359 list = new ArrayList<Exchange>(); 360 grouped.setProperty(Exchange.GROUPED_EXCHANGE, list); 361 } 362 list.add(exchange); 363 } 364 } 365 366 // and after adding process the single grouped exchange 367 if (grouped != null) { 368 processExchange(grouped); 369 } 370 } 371 } 372 373 }