001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 import java.util.concurrent.TimeUnit; 022 import java.util.concurrent.locks.Condition; 023 import java.util.concurrent.locks.Lock; 024 import java.util.concurrent.locks.ReentrantLock; 025 026 import org.apache.camel.Exchange; 027 import org.apache.camel.Navigate; 028 import org.apache.camel.Processor; 029 import org.apache.camel.impl.LoggingExceptionHandler; 030 import org.apache.camel.impl.ServiceSupport; 031 import org.apache.camel.processor.resequencer.ResequencerEngine; 032 import org.apache.camel.processor.resequencer.SequenceElementComparator; 033 import org.apache.camel.processor.resequencer.SequenceSender; 034 import org.apache.camel.spi.ExceptionHandler; 035 import org.apache.camel.util.ServiceHelper; 036 037 /** 038 * A resequencer that re-orders a (continuous) stream of {@link Exchange}s. The 039 * algorithm implemented by {@link ResequencerEngine} is based on the detection 040 * of gaps in a message stream rather than on a fixed batch size. Gap detection 041 * in combination with timeouts removes the constraint of having to know the 042 * number of messages of a sequence (i.e. the batch size) in advance. 043 * <p> 044 * Messages must contain a unique sequence number for which a predecessor and a 045 * successor is known. For example a message with the sequence number 3 has a 046 * predecessor message with the sequence number 2 and a successor message with 047 * the sequence number 4. The message sequence 2,3,5 has a gap because the 048 * sucessor of 3 is missing. The resequencer therefore has to retain message 5 049 * until message 4 arrives (or a timeout occurs). 050 * <p> 051 * Instances of this class poll for {@link Exchange}s from a given 052 * <code>endpoint</code>. Resequencing work and the delivery of messages to 053 * the next <code>processor</code> is done within the single polling thread. 054 * 055 * @author Martin Krasser 056 * 057 * @version $Revision: 769303 $ 058 * 059 * @see ResequencerEngine 060 */ 061 public class StreamResequencer extends ServiceSupport implements SequenceSender<Exchange>, Processor, Navigate<Processor> { 062 063 private static final long DELIVERY_ATTEMPT_INTERVAL = 1000L; 064 065 private final ExceptionHandler exceptionHandler; 066 private final ResequencerEngine<Exchange> engine; 067 private final Processor processor; 068 private Delivery delivery; 069 private int capacity; 070 071 /** 072 * Creates a new {@link StreamResequencer} instance. 073 * 074 * @param processor next processor that processes re-ordered exchanges. 075 * @param comparator a sequence element comparator for exchanges. 076 */ 077 public StreamResequencer(Processor processor, SequenceElementComparator<Exchange> comparator) { 078 this.exceptionHandler = new LoggingExceptionHandler(getClass()); 079 this.engine = new ResequencerEngine<Exchange>(comparator); 080 this.engine.setSequenceSender(this); 081 this.processor = processor; 082 } 083 084 /** 085 * Returns this resequencer's exception handler. 086 */ 087 public ExceptionHandler getExceptionHandler() { 088 return exceptionHandler; 089 } 090 091 /** 092 * Returns the next processor. 093 */ 094 public Processor getProcessor() { 095 return processor; 096 } 097 098 /** 099 * Returns this resequencer's capacity. The capacity is the maximum number 100 * of exchanges that can be managed by this resequencer at a given point in 101 * time. If the capacity if reached, polling from the endpoint will be 102 * skipped for <code>timeout</code> milliseconds giving exchanges the 103 * possibility to time out and to be delivered after the waiting period. 104 * 105 * @return this resequencer's capacity. 106 */ 107 public int getCapacity() { 108 return capacity; 109 } 110 111 /** 112 * Returns this resequencer's timeout. This sets the resequencer engine's 113 * timeout via {@link ResequencerEngine#setTimeout(long)}. This value is 114 * also used to define the polling timeout from the endpoint. 115 * 116 * @return this resequencer's timeout. (Processor) 117 * @see ResequencerEngine#setTimeout(long) 118 */ 119 public long getTimeout() { 120 return engine.getTimeout(); 121 } 122 123 public void setCapacity(int capacity) { 124 this.capacity = capacity; 125 } 126 127 public void setTimeout(long timeout) { 128 engine.setTimeout(timeout); 129 } 130 131 @Override 132 public String toString() { 133 return "StreamResequencer[to: " + processor + "]"; 134 } 135 136 @Override 137 protected void doStart() throws Exception { 138 ServiceHelper.startServices(processor); 139 delivery = new Delivery(); 140 engine.start(); 141 delivery.start(); 142 } 143 144 @Override 145 protected void doStop() throws Exception { 146 // let's stop everything in the reverse order 147 // no need to stop the worker thread -- it will stop automatically when this service is stopped 148 engine.stop(); 149 ServiceHelper.stopServices(processor); 150 } 151 152 /** 153 * Sends the <code>exchange</code> to the next <code>processor</code>. 154 * 155 * @param exchange exchange to send. 156 */ 157 public void sendElement(Exchange exchange) throws Exception { 158 processor.process(exchange); 159 } 160 161 public void process(Exchange exchange) throws Exception { 162 while (engine.size() >= capacity) { 163 Thread.sleep(getTimeout()); 164 } 165 engine.insert(exchange); 166 delivery.request(); 167 } 168 169 public boolean hasNext() { 170 return processor != null; 171 } 172 173 public List<Processor> next() { 174 if (!hasNext()) { 175 return null; 176 } 177 List<Processor> answer = new ArrayList<Processor>(1); 178 answer.add(processor); 179 return answer; 180 } 181 182 private class Delivery extends Thread { 183 184 private Lock deliveryRequestLock = new ReentrantLock(); 185 private Condition deliveryRequestCondition = deliveryRequestLock.newCondition(); 186 187 public Delivery() { 188 super("Resequencer Delivery Thread"); 189 } 190 191 @Override 192 public void run() { 193 while (true) { 194 try { 195 deliveryRequestLock.lock(); 196 try { 197 deliveryRequestCondition.await(DELIVERY_ATTEMPT_INTERVAL, TimeUnit.MILLISECONDS); 198 } finally { 199 deliveryRequestLock.unlock(); 200 } 201 } catch (InterruptedException e) { 202 break; 203 } 204 try { 205 engine.deliver(); 206 } catch (Exception e) { 207 exceptionHandler.handleException(e); 208 } 209 } 210 } 211 212 public void cancel() { 213 interrupt(); 214 } 215 216 public void request() { 217 deliveryRequestLock.lock(); 218 try { 219 deliveryRequestCondition.signal(); 220 } finally { 221 deliveryRequestLock.unlock(); 222 } 223 } 224 225 } 226 227 }