001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.component.cxf.transport;
019    
020    import org.apache.camel.CamelContext;
021    import org.apache.camel.Processor;
022    import org.apache.cxf.Bus;
023    import org.apache.cxf.common.logging.LogUtils;
024    import org.apache.cxf.configuration.Configurable;
025    import org.apache.cxf.configuration.Configurer;
026    import org.apache.cxf.io.AbstractCachedOutputStream;
027    import org.apache.cxf.message.Message;
028    import org.apache.cxf.message.MessageImpl;
029    import org.apache.cxf.service.model.EndpointInfo;
030    import org.apache.cxf.transport.AbstractConduit;
031    import org.apache.cxf.transport.Conduit;
032    import org.apache.cxf.transport.Destination;
033    import org.apache.cxf.transport.MessageObserver;
034    import org.apache.cxf.ws.addressing.AttributedURIType;
035    import org.apache.cxf.ws.addressing.EndpointReferenceType;
036    
037    import java.io.ByteArrayInputStream;
038    import java.io.ByteArrayOutputStream;
039    import java.io.IOException;
040    import java.io.InputStream;
041    import java.io.OutputStream;
042    import java.util.logging.Level;
043    import java.util.logging.Logger;
044    
045    /**
046     * @version $Revision: 535478 $
047     */
048    public class CamelConduit extends AbstractConduit implements Configurable {
049        protected static final String BASE_BEAN_NAME_SUFFIX = ".camel-conduit-base";
050        private static final Logger LOG = LogUtils.getL7dLogger(CamelConduit.class);
051        private final CamelTransportBase base;
052        private String targetCamelEndpointUri;
053    
054    /*
055        protected ClientConfig clientConfig;
056        protected ClientBehaviorPolicyType runtimePolicy;
057        protected AddressType address;
058        protected SessionPoolType sessionPool;
059    */
060    
061        public CamelConduit(CamelContext camelContext, Bus bus, EndpointInfo endpointInfo, EndpointReferenceType targetReference) {
062            super(targetReference);
063            AttributedURIType address = targetReference.getAddress();
064            if (address != null) {
065                this.targetCamelEndpointUri = address.getValue();
066            }
067    
068            base = new CamelTransportBase(camelContext, bus, endpointInfo, false, BASE_BEAN_NAME_SUFFIX);
069    
070            initConfig();
071        }
072    
073        // prepare the message for send out , not actually send out the message
074        public void prepare(Message message) throws IOException {
075            getLogger().log(Level.FINE, "CamelConduit send message");
076    
077            message.setContent(OutputStream.class,
078                    new CamelOutputStream(message));
079        }
080    
081        public void close() {
082            getLogger().log(Level.FINE, "CamelConduit closed ");
083    
084            // ensure resources held by session factory are released
085            //
086            base.close();
087        }
088    
089        protected Logger getLogger() {
090            return LOG;
091        }
092    
093        public String getBeanName() {
094            EndpointInfo info = base.endpointInfo;
095            if (info == null) {
096                return "default.camel-conduit";
097            }
098            return info.getName() + ".camel-conduit";
099        }
100    
101        private void initConfig() {
102    
103    /*
104            this.address = base.endpointInfo.getTraversedExtensor(new AddressType(),
105                                                                  AddressType.class);
106            this.sessionPool = base.endpointInfo.getTraversedExtensor(new SessionPoolType(),
107                                                                      SessionPoolType.class);
108            this.clientConfig = base.endpointInfo.getTraversedExtensor(new ClientConfig(),
109                                                                       ClientConfig.class);
110            this.runtimePolicy = base.endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(),
111                                                                        ClientBehaviorPolicyType.class);
112    */
113    
114            Configurer configurer = base.bus.getExtension(Configurer.class);
115            if (null != configurer) {
116                configurer.configureBean(this);
117            }
118        }
119    
120        private class CamelOutputStream extends AbstractCachedOutputStream {
121            private Message outMessage;
122            private boolean isOneWay;
123    
124            public CamelOutputStream(Message m) {
125                outMessage = m;
126            }
127    
128            protected void doFlush() throws IOException {
129                //do nothing here
130            }
131    
132            protected void doClose() throws IOException {
133                isOneWay = outMessage.getExchange().isOneWay();
134                commitOutputMessage();
135                if (!isOneWay) {
136                    handleResponse();
137                }
138            }
139    
140            protected void onWrite() throws IOException {
141    
142            }
143    
144            private void commitOutputMessage() {
145                base.template.send(targetCamelEndpointUri, new Processor() {
146                    public void process(org.apache.camel.Exchange reply) {
147                        Object request = null;
148    
149                        if (isTextPayload()) {
150                            request = currentStream.toString();
151                        }
152                        else {
153                            request = ((ByteArrayOutputStream) currentStream).toByteArray();
154                        }
155    
156                        getLogger().log(Level.FINE, "Conduit Request is :[" + request + "]");
157                        String replyTo = base.getReplyDestination();
158    
159                        //TODO setting up the responseExpected
160    
161                        base.marshal(request, replyTo, reply);
162    
163                        base.setMessageProperties(outMessage, reply);
164    
165                        String correlationID = null;
166                        if (!isOneWay) {
167                            // TODO create a correlationID
168                            String id = null;
169    
170                            if (id != null) {
171                                if (correlationID != null) {
172                                    String error = "User cannot set CamelCorrelationID when "
173                                            + "making a request/reply invocation using "
174                                            + "a static replyTo Queue.";
175                                }
176                                correlationID = id;
177                            }
178                        }
179    
180                        if (correlationID != null) {
181                            reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID, correlationID);
182                        }
183                        else {
184                            //No message correlation id is set. Whatever comeback will be accepted as responses.
185                            // We assume that it will only happen in case of the temp. reply queue.
186                        }
187    
188                        getLogger().log(Level.FINE, "template sending request: ", reply.getIn());
189                    }
190                });
191            }
192    
193            private void handleResponse() throws IOException {
194                // REVISIT distinguish decoupled case or oneway call
195                Object response = null;
196    
197                //TODO if outMessage need to get the response
198                Message inMessage = new MessageImpl();
199                outMessage.getExchange().setInMessage(inMessage);
200                //set the message header back to the incomeMessage
201                //inMessage.put(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS,
202                //              outMessage.get(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS));
203    
204                /*
205                Object result1;
206    
207                Object result = null;
208    
209                javax.camel.Message camelMessage1 = pooledSession.consumer().receive(timeout);
210                getLogger().log(Level.FINE, "template received reply: " , camelMessage1);
211    
212                if (camelMessage1 != null) {
213    
214                    base.populateIncomingContext(camelMessage1, outMessage, CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS);
215                    String messageType = camelMessage1 instanceof TextMessage
216                                ? CamelConstants.TEXT_MESSAGE_TYPE : CamelConstants.BINARY_MESSAGE_TYPE;
217                    result = base.unmarshal((org.apache.camel.Exchange) outMessage);
218                    result1 = result;
219                } else {
220                    String error = "CamelClientTransport.receive() timed out. No message available.";
221                    getLogger().log(Level.SEVERE, error);
222                    //TODO: Review what exception should we throw.
223                    throw new CamelException(error);
224    
225                }
226                response = result1;
227    
228                //set the message header back to the incomeMessage
229                inMessage.put(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS,
230                              outMessage.get(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS));
231    
232                */
233    
234                getLogger().log(Level.FINE, "The Response Message is : [" + response + "]");
235    
236                // setup the inMessage response stream
237                byte[] bytes = null;
238                if (response instanceof String) {
239                    String requestString = (String) response;
240                    bytes = requestString.getBytes();
241                }
242                else {
243                    bytes = (byte[]) response;
244                }
245                inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
246                getLogger().log(Level.FINE, "incoming observer is " + incomingObserver);
247                incomingObserver.onMessage(inMessage);
248            }
249        }
250    
251        private boolean isTextPayload() {
252            // TODO use runtime policy
253            return true;
254        }
255    
256        /**
257         * Represented decoupled response endpoint.
258         */
259        protected class DecoupledDestination implements Destination {
260            protected MessageObserver decoupledMessageObserver;
261            private EndpointReferenceType address;
262    
263            DecoupledDestination(EndpointReferenceType ref,
264                    MessageObserver incomingObserver) {
265                address = ref;
266                decoupledMessageObserver = incomingObserver;
267            }
268    
269            public EndpointReferenceType getAddress() {
270                return address;
271            }
272    
273            public Conduit getBackChannel(Message inMessage,
274                    Message partialResponse,
275                    EndpointReferenceType addr)
276                    throws IOException {
277                // shouldn't be called on decoupled endpoint
278                return null;
279            }
280    
281            public void shutdown() {
282                // TODO Auto-generated method stub
283            }
284    
285            public synchronized void setMessageObserver(MessageObserver observer) {
286                decoupledMessageObserver = observer;
287            }
288    
289            public synchronized MessageObserver getMessageObserver() {
290                return decoupledMessageObserver;
291            }
292        }
293    
294    }