001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.Iterator;
022    import java.util.LinkedList;
023    import java.util.List;
024    import java.util.Queue;
025    import java.util.concurrent.TimeUnit;
026    import java.util.concurrent.locks.Condition;
027    import java.util.concurrent.locks.Lock;
028    import java.util.concurrent.locks.ReentrantLock;
029    
030    import org.apache.camel.Exchange;
031    import org.apache.camel.Navigate;
032    import org.apache.camel.Processor;
033    import org.apache.camel.impl.GroupedExchange;
034    import org.apache.camel.impl.LoggingExceptionHandler;
035    import org.apache.camel.impl.ServiceSupport;
036    import org.apache.camel.spi.ExceptionHandler;
037    import org.apache.camel.util.ObjectHelper;
038    import org.apache.camel.util.ServiceHelper;
039    import org.apache.camel.util.concurrent.ExecutorServiceHelper;
040    import org.apache.commons.logging.Log;
041    import org.apache.commons.logging.LogFactory;
042    
043    /**
044     * A base class for any kind of {@link Processor} which implements some kind of batch processing.
045     * 
046     * @version $Revision: 782594 $
047     */
048    public class BatchProcessor extends ServiceSupport implements Processor, Navigate<Processor> {
049    
050        public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
051        public static final int DEFAULT_BATCH_SIZE = 100;
052    
053        private static final Log LOG = LogFactory.getLog(BatchProcessor.class);
054    
055        private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
056        private int batchSize = DEFAULT_BATCH_SIZE;
057        private int outBatchSize;
058        private boolean groupExchanges;
059        private boolean batchConsumer;
060    
061        private final Processor processor;
062        private final Collection<Exchange> collection;
063        private ExceptionHandler exceptionHandler;
064    
065        private final BatchSender sender;
066    
067        public BatchProcessor(Processor processor, Collection<Exchange> collection) {
068            ObjectHelper.notNull(processor, "processor");
069            ObjectHelper.notNull(collection, "collection");
070            this.processor = processor;
071            this.collection = collection;
072            this.sender = new BatchSender();
073        }
074    
075        @Override
076        public String toString() {
077            return "BatchProcessor[to: " + processor + "]";
078        }
079    
080        // Properties
081        // -------------------------------------------------------------------------
082        public ExceptionHandler getExceptionHandler() {
083            if (exceptionHandler == null) {
084                exceptionHandler = new LoggingExceptionHandler(getClass());
085            }
086            return exceptionHandler;
087        }
088    
089        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
090            this.exceptionHandler = exceptionHandler;
091        }
092    
093        public int getBatchSize() {
094            return batchSize;
095        }
096    
097        /**
098         * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor will
099         * process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}.
100         * 
101         * @param batchSize the size
102         */
103        public void setBatchSize(int batchSize) {
104            // setting batch size to 0 or negative is like disabling it, so we set it as the max value
105            // as the code logic is dependt on a batch size having 1..n value
106            if (batchSize <= 0) {
107                LOG.debug("Disabling batch size, will only be triggered by timeout");
108                this.batchSize = Integer.MAX_VALUE;
109            } else {
110                this.batchSize = batchSize;
111            }
112        }
113    
114        public int getOutBatchSize() {
115            return outBatchSize;
116        }
117    
118        /**
119         * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then the
120         * completion is triggered. Can for instance be used to ensure that this batch is completed when a certain
121         * number of exchanges has been collected. By default this feature is <b>not</b> enabled.
122         * 
123         * @param outBatchSize the size
124         */
125        public void setOutBatchSize(int outBatchSize) {
126            this.outBatchSize = outBatchSize;
127        }
128    
129        public long getBatchTimeout() {
130            return batchTimeout;
131        }
132    
133        public void setBatchTimeout(long batchTimeout) {
134            this.batchTimeout = batchTimeout;
135        }
136    
137        public boolean isGroupExchanges() {
138            return groupExchanges;
139        }
140    
141        public void setGroupExchanges(boolean groupExchanges) {
142            this.groupExchanges = groupExchanges;
143        }
144    
145        public boolean isBatchConsumer() {
146            return batchConsumer;
147        }
148    
149        public void setBatchConsumer(boolean batchConsumer) {
150            this.batchConsumer = batchConsumer;
151        }
152    
153        public Processor getProcessor() {
154            return processor;
155        }
156    
157        public List<Processor> next() {
158            if (!hasNext()) {
159                return null;
160            }
161            List<Processor> answer = new ArrayList<Processor>(1);
162            answer.add(processor);
163            return answer;
164        }
165    
166        public boolean hasNext() {
167            return processor != null;
168        }
169    
170        /**
171         * A strategy method to decide if the "in" batch is completed. That is, whether the resulting exchanges in
172         * the in queue should be drained to the "out" collection.
173         */
174        private boolean isInBatchCompleted(int num) {
175            return num >= batchSize;
176        }
177    
178        /**
179         * A strategy method to decide if the "out" batch is completed. That is, whether the resulting exchange in
180         * the out collection should be sent.
181         */
182        private boolean isOutBatchCompleted() {
183            if (outBatchSize == 0) {
184                // out batch is disabled, so go ahead and send.
185                return true;
186            }
187            return collection.size() > 0 && collection.size() >= outBatchSize;
188        }
189    
190        /**
191         * Strategy Method to process an exchange in the batch. This method allows derived classes to perform
192         * custom processing before or after an individual exchange is processed
193         */
194        protected void processExchange(Exchange exchange) throws Exception {
195            processor.process(exchange);
196        }
197    
198        protected void doStart() throws Exception {
199            ServiceHelper.startServices(processor);
200            sender.start();
201        }
202    
203        protected void doStop() throws Exception {
204            sender.cancel();
205            ServiceHelper.stopServices(sender);
206            ServiceHelper.stopServices(processor);
207            collection.clear();
208        }
209    
210        /**
211         * Enqueues an exchange for later batch processing.
212         */
213        public void process(Exchange exchange) throws Exception {
214    
215            // if batch consumer is enabled then we need to adjust the batch size
216            // with the size from the batch consumer
217            if (isBatchConsumer()) {
218                int size = exchange.getProperty(Exchange.BATCH_SIZE, Integer.class);
219                if (batchSize != size) {
220                    batchSize = size;
221                    if (LOG.isTraceEnabled()) {
222                        LOG.trace("Using batch consumer completion, so setting batch size to: " + batchSize);
223                    }
224                }
225            }
226    
227            sender.enqueueExchange(exchange);
228        }
229    
230        /**
231         * Sender thread for queued-up exchanges.
232         */
233        private class BatchSender extends Thread {
234    
235            private Queue<Exchange> queue;
236            private Lock queueLock = new ReentrantLock();
237            private boolean exchangeEnqueued;
238            private Condition exchangeEnqueuedCondition = queueLock.newCondition();
239    
240            public BatchSender() {
241                super(ExecutorServiceHelper.getThreadName("Batch Sender"));
242                this.queue = new LinkedList<Exchange>();
243            }
244    
245            @Override
246            public void run() {
247                // Wait until one of either:
248                // * an exchange being queued;
249                // * the batch timeout expiring; or
250                // * the thread being cancelled.
251                //
252                // If an exchange is queued then we need to determine whether the
253                // batch is complete. If it is complete then we send out the batched
254                // exchanges. Otherwise we move back into our wait state.
255                //
256                // If the batch times out then we send out the batched exchanges
257                // collected so far.
258                //
259                // If we receive an interrupt then all blocking operations are
260                // interrupted and our thread terminates.
261                //
262                // The goal of the following algorithm in terms of synchronisation
263                // is to provide fine grained locking i.e. retaining the lock only
264                // when required. Special consideration is given to releasing the
265                // lock when calling an overloaded method i.e. sendExchanges. 
266                // Unlocking is important as the process of sending out the exchanges
267                // would otherwise block new exchanges from being queued.
268    
269                queueLock.lock();
270                try {
271                    do {
272                        try {
273                            if (!exchangeEnqueued) {
274                                exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS);
275                            }
276    
277                            if (!exchangeEnqueued) {
278                                drainQueueTo(collection, batchSize);
279                            } else {             
280                                exchangeEnqueued = false;
281                                while (isInBatchCompleted(queue.size())) {
282                                    drainQueueTo(collection, batchSize);
283                                }
284                                
285                                if (!isOutBatchCompleted()) {
286                                    continue;
287                                }
288                            }
289    
290                            queueLock.unlock();
291                            try {
292                                try {
293                                    sendExchanges();
294                                } catch (Exception e) {
295                                    getExceptionHandler().handleException(e);
296                                }
297                            } finally {
298                                queueLock.lock();
299                            }
300    
301                        } catch (InterruptedException e) {
302                            break;
303                        }
304    
305                    } while (isRunAllowed());
306    
307                } finally {
308                    queueLock.unlock();
309                }
310            }
311    
312            /**
313             * This method should be called with queueLock held
314             */
315            private void drainQueueTo(Collection<Exchange> collection, int batchSize) {
316                for (int i = 0; i < batchSize; ++i) {
317                    Exchange e = queue.poll();
318                    if (e != null) {
319                        collection.add(e);
320                    } else {
321                        break;
322                    }
323                }
324            }
325    
326            public void cancel() {
327                interrupt();
328            }
329    
330            public void enqueueExchange(Exchange exchange) {
331                queueLock.lock();
332                try {
333                    queue.add(exchange);
334                    exchangeEnqueued = true;
335                    exchangeEnqueuedCondition.signal();
336                } finally {
337                    queueLock.unlock();
338                }
339            }
340    
341            private void sendExchanges() throws Exception {
342                GroupedExchange grouped = null;
343    
344                Iterator<Exchange> iter = collection.iterator();
345                while (iter.hasNext()) {
346                    Exchange exchange = iter.next();
347                    iter.remove();
348                    if (!groupExchanges) {
349                        // non grouped so process the exchange one at a time
350                        processExchange(exchange);
351                    } else {
352                        // grouped so add all exchanges into one group
353                        if (grouped == null) {
354                            grouped = new GroupedExchange(exchange.getContext());
355                        }
356                        grouped.addExchange(exchange);
357                    }
358                }
359    
360                // and after adding process the single grouped exchange
361                if (grouped != null) {
362                    processExchange(grouped);
363                }
364            }
365        }
366    
367    }