1 |
|
|
2 |
|
|
3 |
|
|
4 |
|
|
5 |
|
|
6 |
|
|
7 |
|
|
8 |
|
|
9 |
|
|
10 |
|
|
11 |
|
|
12 |
|
|
13 |
|
|
14 |
|
|
15 |
|
|
16 |
|
|
17 |
|
package org.apache.camel.component.cxf.transport; |
18 |
|
|
19 |
|
import java.io.ByteArrayInputStream; |
20 |
|
import java.io.IOException; |
21 |
|
import java.io.InputStream; |
22 |
|
import java.io.OutputStream; |
23 |
|
import java.util.logging.Level; |
24 |
|
import java.util.logging.Logger; |
25 |
|
|
26 |
|
import org.apache.camel.CamelContext; |
27 |
|
import org.apache.camel.Endpoint; |
28 |
|
import org.apache.camel.Exchange; |
29 |
|
import org.apache.camel.Processor; |
30 |
|
import org.apache.camel.Producer; |
31 |
|
import org.apache.cxf.Bus; |
32 |
|
import org.apache.cxf.common.logging.LogUtils; |
33 |
|
import org.apache.cxf.configuration.Configurable; |
34 |
|
import org.apache.cxf.io.CachedOutputStream; |
35 |
|
import org.apache.cxf.message.Message; |
36 |
|
import org.apache.cxf.message.MessageImpl; |
37 |
|
import org.apache.cxf.service.model.EndpointInfo; |
38 |
|
import org.apache.cxf.transport.AbstractConduit; |
39 |
|
import org.apache.cxf.transport.AbstractDestination; |
40 |
|
import org.apache.cxf.transport.Conduit; |
41 |
|
import org.apache.cxf.transport.ConduitInitiator; |
42 |
|
import org.apache.cxf.transport.MessageObserver; |
43 |
|
import org.apache.cxf.ws.addressing.EndpointReferenceType; |
44 |
|
import org.apache.cxf.wsdl.EndpointReferenceUtils; |
45 |
|
|
46 |
|
|
47 |
|
|
48 |
|
|
49 |
0 |
public class CamelDestination extends AbstractDestination implements Configurable { |
50 |
|
protected static final String BASE_BEAN_NAME_SUFFIX = ".camel-destination-base"; |
51 |
0 |
private static final Logger LOG = LogUtils.getL7dLogger(CamelDestination.class); |
52 |
|
CamelContext camelContext; |
53 |
|
String camelUri; |
54 |
|
final ConduitInitiator conduitInitiator; |
55 |
|
private CamelTransportBase base; |
56 |
|
private Endpoint endpoint; |
57 |
|
|
58 |
|
public CamelDestination(CamelContext camelContext, Bus bus, ConduitInitiator ci, EndpointInfo info) throws IOException { |
59 |
0 |
super(getTargetReference(info, bus), info); |
60 |
0 |
this.camelContext = camelContext; |
61 |
|
|
62 |
0 |
base = new CamelTransportBase(camelContext, bus, endpointInfo, true, BASE_BEAN_NAME_SUFFIX); |
63 |
|
|
64 |
0 |
conduitInitiator = ci; |
65 |
|
|
66 |
0 |
initConfig(); |
67 |
0 |
} |
68 |
|
|
69 |
|
protected Logger getLogger() { |
70 |
0 |
return LOG; |
71 |
|
} |
72 |
|
|
73 |
|
|
74 |
|
|
75 |
|
|
76 |
|
|
77 |
|
protected Conduit getInbuiltBackChannel(Message inMessage) { |
78 |
0 |
return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(), inMessage); |
79 |
|
} |
80 |
|
|
81 |
|
public void activate() { |
82 |
0 |
getLogger().log(Level.INFO, "CamelDestination activate().... "); |
83 |
|
|
84 |
|
try { |
85 |
0 |
getLogger().log(Level.FINE, "establishing Camel connection"); |
86 |
0 |
endpoint = camelContext.getEndpoint(camelUri); |
87 |
0 |
} catch (Exception ex) { |
88 |
0 |
getLogger().log(Level.SEVERE, "Camel connect failed with EException : ", ex); |
89 |
0 |
} |
90 |
0 |
} |
91 |
|
|
92 |
|
public void deactivate() { |
93 |
0 |
base.close(); |
94 |
0 |
} |
95 |
|
|
96 |
|
public void shutdown() { |
97 |
0 |
getLogger().log(Level.FINE, "CamelDestination shutdown()"); |
98 |
0 |
this.deactivate(); |
99 |
0 |
} |
100 |
|
|
101 |
|
protected void incoming(Exchange exchange) { |
102 |
0 |
getLogger().log(Level.FINE, "server received request: ", exchange); |
103 |
|
|
104 |
0 |
byte[] bytes = base.unmarshal(exchange); |
105 |
|
|
106 |
|
|
107 |
0 |
MessageImpl inMessage = new MessageImpl(); |
108 |
0 |
inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes)); |
109 |
0 |
base.populateIncomingContext(exchange, inMessage, CamelConstants.CAMEL_SERVER_REQUEST_HEADERS); |
110 |
|
|
111 |
|
|
112 |
0 |
inMessage.put(CamelConstants.CAMEL_REQUEST_MESSAGE, exchange); |
113 |
|
|
114 |
0 |
inMessage.setDestination(this); |
115 |
|
|
116 |
|
|
117 |
0 |
incomingObserver.onMessage(inMessage); |
118 |
0 |
} |
119 |
|
|
120 |
|
public String getBeanName() { |
121 |
0 |
return endpointInfo.getName().toString() + ".camel-destination"; |
122 |
|
} |
123 |
|
|
124 |
|
private void initConfig() { |
125 |
|
|
126 |
|
|
127 |
|
|
128 |
|
|
129 |
|
|
130 |
|
|
131 |
|
|
132 |
|
|
133 |
|
|
134 |
|
|
135 |
0 |
} |
136 |
|
|
137 |
0 |
protected class ConsumerProcessor implements Processor { |
138 |
|
public void process(Exchange exchange) { |
139 |
|
try { |
140 |
0 |
incoming(exchange); |
141 |
0 |
} catch (Throwable ex) { |
142 |
0 |
getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex); |
143 |
0 |
} |
144 |
0 |
} |
145 |
|
} |
146 |
|
|
147 |
|
|
148 |
|
protected class BackChannelConduit extends AbstractConduit { |
149 |
|
protected Message inMessage; |
150 |
|
|
151 |
0 |
BackChannelConduit(EndpointReferenceType ref, Message message) { |
152 |
0 |
super(ref); |
153 |
0 |
inMessage = message; |
154 |
0 |
} |
155 |
|
|
156 |
|
|
157 |
|
|
158 |
|
|
159 |
|
|
160 |
|
|
161 |
|
public void setMessageObserver(MessageObserver observer) { |
162 |
|
|
163 |
0 |
} |
164 |
|
|
165 |
|
|
166 |
|
|
167 |
|
|
168 |
|
|
169 |
|
|
170 |
|
|
171 |
|
public void prepare(Message message) throws IOException { |
172 |
|
|
173 |
0 |
message.put(CamelConstants.CAMEL_REQUEST_MESSAGE, inMessage.get(CamelConstants.CAMEL_REQUEST_MESSAGE)); |
174 |
0 |
message.setContent(OutputStream.class, new CamelOutputStream(inMessage)); |
175 |
0 |
} |
176 |
|
|
177 |
|
protected Logger getLogger() { |
178 |
0 |
return LOG; |
179 |
|
} |
180 |
|
|
181 |
|
} |
182 |
|
|
183 |
0 |
private class CamelOutputStream extends CachedOutputStream { |
184 |
|
private Message inMessage; |
185 |
|
private Producer<Exchange> replyTo; |
186 |
|
private Producer<Exchange> sender; |
187 |
|
|
188 |
|
|
189 |
0 |
public CamelOutputStream(Message m) { |
190 |
0 |
super(); |
191 |
0 |
inMessage = m; |
192 |
0 |
} |
193 |
|
|
194 |
|
|
195 |
|
private void commitOutputMessage() throws IOException { |
196 |
|
|
197 |
|
|
198 |
0 |
final String replyToUri = getReplyToDestination(inMessage); |
199 |
|
|
200 |
0 |
base.template.send(replyToUri, new Processor() { |
201 |
0 |
public void process(Exchange reply) { |
202 |
0 |
base.marshal(currentStream.toString(), replyToUri, reply); |
203 |
|
|
204 |
0 |
setReplyCorrelationID(inMessage, reply); |
205 |
|
|
206 |
0 |
base.setMessageProperties(inMessage, reply); |
207 |
|
|
208 |
0 |
getLogger().log(Level.FINE, "just server sending reply: ", reply); |
209 |
0 |
} |
210 |
|
}); |
211 |
0 |
} |
212 |
|
|
213 |
|
@Override |
214 |
|
protected void doFlush() throws IOException { |
215 |
|
|
216 |
0 |
} |
217 |
|
|
218 |
|
@Override |
219 |
|
protected void doClose() throws IOException { |
220 |
0 |
commitOutputMessage(); |
221 |
0 |
} |
222 |
|
|
223 |
|
@Override |
224 |
|
protected void onWrite() throws IOException { |
225 |
|
|
226 |
0 |
} |
227 |
|
} |
228 |
|
|
229 |
|
protected String getReplyToDestination(Message inMessage) { |
230 |
0 |
if (inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO) != null) { |
231 |
0 |
return (String)inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO); |
232 |
|
} else { |
233 |
0 |
return base.getReplyDestination(); |
234 |
|
} |
235 |
|
} |
236 |
|
|
237 |
|
protected void setReplyCorrelationID(Message inMessage, Exchange reply) { |
238 |
0 |
Object value = inMessage.get(CamelConstants.CAMEL_CORRELATION_ID); |
239 |
0 |
if (value != null) { |
240 |
0 |
reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID, value); |
241 |
|
} |
242 |
0 |
} |
243 |
|
} |