001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.processor;
019    
020    import org.apache.camel.Endpoint;
021    import org.apache.camel.Exchange;
022    import org.apache.camel.Expression;
023    import org.apache.camel.PollingConsumer;
024    import org.apache.camel.Processor;
025    import org.apache.camel.impl.LoggingExceptionHandler;
026    import org.apache.camel.impl.ServiceSupport;
027    import org.apache.camel.spi.ExceptionHandler;
028    import org.apache.camel.util.ExpressionComparator;
029    import org.apache.camel.util.ExpressionListComparator;
030    import org.apache.camel.util.ServiceHelper;
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    
034    import java.util.Comparator;
035    import java.util.Iterator;
036    import java.util.List;
037    import java.util.Set;
038    import java.util.TreeSet;
039    
040    /**
041     * An implementation of the <a href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
042     *
043     * @version $Revision: 1.1 $
044     */
045    public class Resequencer extends ServiceSupport implements Runnable {
046        private static final transient Log log = LogFactory.getLog(Resequencer.class);
047        private Endpoint endpoint;
048        private Processor processor;
049        private Set<Exchange> set;
050        private long batchTimeout = 1000L;
051        private int batchSize = 100;
052        private PollingConsumer consumer;
053        private ExceptionHandler exceptionHandler;
054    
055        public Resequencer(Endpoint endpoint, Processor processor, Expression<Exchange> expression) {
056            this(endpoint, processor, createSet(expression));
057        }
058    
059        public Resequencer(Endpoint endpoint, Processor processor, List<Expression<Exchange>> expressions) {
060            this(endpoint, processor, createSet(expressions));
061        }
062    
063        public Resequencer(Endpoint endpoint, Processor processor, Set<Exchange> set) {
064            this.endpoint = endpoint;
065            this.processor = processor;
066            this.set = set;
067        }
068    
069        @Override
070        public String toString() {
071            return "Resequencer[to: " + processor + "]";
072        }
073    
074        public void run() {
075            log.debug("Starting thread for " + this);
076            while (!isStopped() && !isStopping()) {
077                try {
078                    processBatch();
079                }
080                catch (Exception e) {
081                    getExceptionHandler().handleException(e);
082                }
083            }
084            set.clear();
085        }
086    
087        // Properties
088        //-------------------------------------------------------------------------
089        public ExceptionHandler getExceptionHandler() {
090            if (exceptionHandler == null) {
091                exceptionHandler = new LoggingExceptionHandler(getClass());
092            }
093            return exceptionHandler;
094        }
095    
096        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
097            this.exceptionHandler = exceptionHandler;
098        }
099    
100        public int getBatchSize() {
101            return batchSize;
102        }
103    
104        public void setBatchSize(int batchSize) {
105            this.batchSize = batchSize;
106        }
107    
108        public long getBatchTimeout() {
109            return batchTimeout;
110        }
111    
112        public void setBatchTimeout(long batchTimeout) {
113            this.batchTimeout = batchTimeout;
114        }
115    
116        // Implementation methods
117        //-------------------------------------------------------------------------
118    
119        /**
120         * A transactional method to process a batch of messages up to a timeout period
121         * or number of messages reached.
122         */
123        protected synchronized void processBatch() throws Exception {
124            long start = System.currentTimeMillis();
125            long end = start + batchTimeout;
126            for (int i = 0; i < batchSize; i++) {
127                long timeout = end - System.currentTimeMillis();
128    
129                Exchange exchange = consumer.receive(timeout);
130                if (exchange == null) {
131                    break;
132                }
133                set.add(exchange);
134            }
135    
136            if (log.isDebugEnabled()) {
137                log.debug("Finsihed batch size: " + batchSize + " timeout: " + batchTimeout + " so sending set: " + set);
138            }
139    
140            // lets send the batch
141            Iterator<Exchange> iter = set.iterator();
142            while (iter.hasNext()) {
143                Exchange exchange = iter.next();
144                iter.remove();
145                processor.process(exchange);
146            }
147        }
148    
149        protected void doStart() throws Exception {
150            consumer = endpoint.createPollingConsumer();
151    
152            ServiceHelper.startServices(processor, consumer);
153    
154            Thread thread = new Thread(this, this + " Polling Thread");
155            thread.start();
156        }
157    
158        protected void doStop() throws Exception {
159            ServiceHelper.stopServices(consumer, processor);
160            consumer = null;
161        }
162    
163        protected static Set<Exchange> createSet(Expression<Exchange> expression) {
164            return createSet(new ExpressionComparator<Exchange>(expression));
165        }
166    
167        protected static Set<Exchange> createSet(List<Expression<Exchange>> expressions) {
168            if (expressions.size() == 1) {
169                return createSet(expressions.get(0));
170            }
171            return createSet(new ExpressionListComparator<Exchange>(expressions));
172        }
173    
174        protected static Set<Exchange> createSet(Comparator<? super Exchange> comparator) {
175            return new TreeSet<Exchange>(comparator);
176        }
177    }