1 |
|
|
2 |
|
|
3 |
|
|
4 |
|
|
5 |
|
|
6 |
|
|
7 |
|
|
8 |
|
|
9 |
|
|
10 |
|
|
11 |
|
|
12 |
|
|
13 |
|
|
14 |
|
|
15 |
|
|
16 |
|
|
17 |
|
package org.apache.camel.processor; |
18 |
|
|
19 |
|
import java.util.Collection; |
20 |
|
import java.util.Iterator; |
21 |
|
|
22 |
|
import org.apache.camel.Endpoint; |
23 |
|
import org.apache.camel.Exchange; |
24 |
|
import org.apache.camel.PollingConsumer; |
25 |
|
import org.apache.camel.Processor; |
26 |
|
import org.apache.camel.impl.LoggingExceptionHandler; |
27 |
|
import org.apache.camel.impl.ServiceSupport; |
28 |
|
import org.apache.camel.spi.ExceptionHandler; |
29 |
|
import org.apache.camel.util.ServiceHelper; |
30 |
|
import org.apache.commons.logging.Log; |
31 |
|
import org.apache.commons.logging.LogFactory; |
32 |
|
|
33 |
|
|
34 |
|
|
35 |
|
|
36 |
|
|
37 |
|
|
38 |
|
|
39 |
|
public class BatchProcessor extends ServiceSupport implements Runnable { |
40 |
3 |
private static final transient Log LOG = LogFactory.getLog(Resequencer.class); |
41 |
|
private Endpoint endpoint; |
42 |
|
private Processor processor; |
43 |
|
private Collection<Exchange> collection; |
44 |
6 |
private long batchTimeout = 1000L; |
45 |
6 |
private int batchSize = 100; |
46 |
|
private PollingConsumer consumer; |
47 |
|
private ExceptionHandler exceptionHandler; |
48 |
|
|
49 |
6 |
public BatchProcessor(Endpoint endpoint, Processor processor, Collection<Exchange> collection) { |
50 |
6 |
this.endpoint = endpoint; |
51 |
6 |
this.processor = processor; |
52 |
6 |
this.collection = collection; |
53 |
6 |
} |
54 |
|
|
55 |
|
@Override |
56 |
|
public String toString() { |
57 |
0 |
return "BatchProcessor[to: " + processor + "]"; |
58 |
|
} |
59 |
|
|
60 |
|
public void run() { |
61 |
6 |
LOG.debug("Starting thread for " + this); |
62 |
15 |
while (!isStopped() && !isStopping()) { |
63 |
|
try { |
64 |
9 |
processBatch(); |
65 |
0 |
} catch (Exception e) { |
66 |
0 |
getExceptionHandler().handleException(e); |
67 |
9 |
} |
68 |
0 |
} |
69 |
6 |
collection.clear(); |
70 |
6 |
} |
71 |
|
|
72 |
|
|
73 |
|
|
74 |
|
public ExceptionHandler getExceptionHandler() { |
75 |
0 |
if (exceptionHandler == null) { |
76 |
0 |
exceptionHandler = new LoggingExceptionHandler(getClass()); |
77 |
|
} |
78 |
0 |
return exceptionHandler; |
79 |
|
} |
80 |
|
|
81 |
|
public void setExceptionHandler(ExceptionHandler exceptionHandler) { |
82 |
0 |
this.exceptionHandler = exceptionHandler; |
83 |
0 |
} |
84 |
|
|
85 |
|
public int getBatchSize() { |
86 |
0 |
return batchSize; |
87 |
|
} |
88 |
|
|
89 |
|
public void setBatchSize(int batchSize) { |
90 |
0 |
this.batchSize = batchSize; |
91 |
0 |
} |
92 |
|
|
93 |
|
public long getBatchTimeout() { |
94 |
0 |
return batchTimeout; |
95 |
|
} |
96 |
|
|
97 |
|
public void setBatchTimeout(long batchTimeout) { |
98 |
0 |
this.batchTimeout = batchTimeout; |
99 |
0 |
} |
100 |
|
|
101 |
|
public Endpoint getEndpoint() { |
102 |
0 |
return endpoint; |
103 |
|
} |
104 |
|
|
105 |
|
public Processor getProcessor() { |
106 |
12 |
return processor; |
107 |
|
} |
108 |
|
|
109 |
|
|
110 |
|
|
111 |
|
|
112 |
|
|
113 |
|
protected synchronized void processBatch() throws Exception { |
114 |
9 |
long start = System.currentTimeMillis(); |
115 |
9 |
long end = start + batchTimeout; |
116 |
321 |
for (int i = 0; i < batchSize; i++) { |
117 |
318 |
long timeout = end - System.currentTimeMillis(); |
118 |
|
|
119 |
318 |
Exchange exchange = consumer.receive(timeout); |
120 |
318 |
if (exchange == null) { |
121 |
6 |
break; |
122 |
|
} |
123 |
312 |
collection.add(exchange); |
124 |
|
} |
125 |
|
|
126 |
9 |
if (LOG.isDebugEnabled()) { |
127 |
0 |
LOG.debug("Finsihed batch size: " + batchSize + " timeout: " + batchTimeout + " so sending set: " |
128 |
|
+ collection); |
129 |
|
} |
130 |
|
|
131 |
|
|
132 |
9 |
Iterator<Exchange> iter = collection.iterator(); |
133 |
24 |
while (iter.hasNext()) { |
134 |
15 |
Exchange exchange = iter.next(); |
135 |
15 |
iter.remove(); |
136 |
15 |
processExchange(exchange); |
137 |
15 |
} |
138 |
9 |
} |
139 |
|
|
140 |
|
|
141 |
|
|
142 |
|
|
143 |
|
|
144 |
|
|
145 |
|
protected void processExchange(Exchange exchange) throws Exception { |
146 |
15 |
processor.process(exchange); |
147 |
15 |
} |
148 |
|
|
149 |
|
protected void doStart() throws Exception { |
150 |
6 |
consumer = endpoint.createPollingConsumer(); |
151 |
|
|
152 |
6 |
ServiceHelper.startServices(processor, consumer); |
153 |
|
|
154 |
6 |
Thread thread = new Thread(this, this + " Polling Thread"); |
155 |
6 |
thread.start(); |
156 |
6 |
} |
157 |
|
|
158 |
|
protected void doStop() throws Exception { |
159 |
6 |
ServiceHelper.stopServices(consumer, processor); |
160 |
6 |
collection.clear(); |
161 |
6 |
} |
162 |
|
|
163 |
|
protected Collection<Exchange> getCollection() { |
164 |
0 |
return collection; |
165 |
|
} |
166 |
|
} |