001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.cxf.transport;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.ByteArrayOutputStream;
021    import java.io.IOException;
022    import java.io.InputStream;
023    import java.io.OutputStream;
024    import java.util.logging.Level;
025    import java.util.logging.Logger;
026    
027    import org.apache.camel.CamelContext;
028    import org.apache.camel.Processor;
029    import org.apache.cxf.Bus;
030    import org.apache.cxf.common.logging.LogUtils;
031    import org.apache.cxf.configuration.Configurable;
032    import org.apache.cxf.configuration.Configurer;
033    import org.apache.cxf.io.CachedOutputStream;
034    import org.apache.cxf.message.Message;
035    import org.apache.cxf.message.MessageImpl;
036    import org.apache.cxf.service.model.EndpointInfo;
037    import org.apache.cxf.transport.AbstractConduit;
038    import org.apache.cxf.transport.Conduit;
039    import org.apache.cxf.transport.Destination;
040    import org.apache.cxf.transport.MessageObserver;
041    import org.apache.cxf.ws.addressing.AttributedURIType;
042    import org.apache.cxf.ws.addressing.EndpointReferenceType;
043    
044    /**
045     * @version $Revision: 563665 $
046     */
047    public class CamelConduit extends AbstractConduit implements Configurable {
048        protected static final String BASE_BEAN_NAME_SUFFIX = ".camel-conduit-base";
049        private static final Logger LOG = LogUtils.getL7dLogger(CamelConduit.class);
050        private final CamelTransportBase base;
051        private String targetCamelEndpointUri;
052    
053        /*
054         * protected ClientConfig clientConfig; protected ClientBehaviorPolicyType
055         * runtimePolicy; protected AddressType address; protected SessionPoolType
056         * sessionPool;
057         */
058    
059        public CamelConduit(CamelContext camelContext, Bus bus, EndpointInfo endpointInfo, EndpointReferenceType targetReference) {
060            super(targetReference);
061            AttributedURIType address = targetReference.getAddress();
062            if (address != null) {
063                this.targetCamelEndpointUri = address.getValue();
064            }
065    
066            base = new CamelTransportBase(camelContext, bus, endpointInfo, false, BASE_BEAN_NAME_SUFFIX);
067    
068            initConfig();
069        }
070    
071        // prepare the message for send out , not actually send out the message
072        public void prepare(Message message) throws IOException {
073            getLogger().log(Level.FINE, "CamelConduit send message");
074    
075            message.setContent(OutputStream.class, new CamelOutputStream(message));
076        }
077    
078        public void close() {
079            getLogger().log(Level.FINE, "CamelConduit closed ");
080    
081            // ensure resources held by session factory are released
082            //
083            base.close();
084        }
085    
086        protected Logger getLogger() {
087            return LOG;
088        }
089    
090        public String getBeanName() {
091            EndpointInfo info = base.endpointInfo;
092            if (info == null) {
093                return "default.camel-conduit";
094            }
095            return info.getName() + ".camel-conduit";
096        }
097    
098        private void initConfig() {
099    
100            /*
101             * this.address = base.endpointInfo.getTraversedExtensor(new
102             * AddressType(), AddressType.class); this.sessionPool =
103             * base.endpointInfo.getTraversedExtensor(new SessionPoolType(),
104             * SessionPoolType.class); this.clientConfig =
105             * base.endpointInfo.getTraversedExtensor(new ClientConfig(),
106             * ClientConfig.class); this.runtimePolicy =
107             * base.endpointInfo.getTraversedExtensor(new
108             * ClientBehaviorPolicyType(), ClientBehaviorPolicyType.class);
109             */
110    
111            Configurer configurer = base.bus.getExtension(Configurer.class);
112            if (null != configurer) {
113                configurer.configureBean(this);
114            }
115        }
116    
117        private class CamelOutputStream extends CachedOutputStream {
118            private Message outMessage;
119            private boolean isOneWay;
120    
121            public CamelOutputStream(Message m) {
122                outMessage = m;
123            }
124    
125            protected void doFlush() throws IOException {
126                // do nothing here
127            }
128    
129            protected void doClose() throws IOException {
130                isOneWay = outMessage.getExchange().isOneWay();
131                commitOutputMessage();
132                if (!isOneWay) {
133                    handleResponse();
134                }
135            }
136    
137            protected void onWrite() throws IOException {
138    
139            }
140    
141            private void commitOutputMessage() {
142                base.template.send(targetCamelEndpointUri, new Processor() {
143                    public void process(org.apache.camel.Exchange reply) {
144                        Object request = null;
145                        if (isTextPayload()) {
146                            request = currentStream.toString();
147                        } else {
148                            request = ((ByteArrayOutputStream)currentStream).toByteArray();
149                        }
150    
151                        getLogger().log(Level.FINE, "Conduit Request is :[" + request + "]");
152                        String replyTo = base.getReplyDestination();
153                        // TODO setting up the responseExpected
154                        base.marshal(request, replyTo, reply);
155                        base.setMessageProperties(outMessage, reply);
156    
157                        String correlationID = null;
158                        if (!isOneWay) {
159                            // TODO create a correlationID
160                            String id = null;
161    
162                            if (id != null) {
163                                if (correlationID != null) {
164                                    String error = "User cannot set CamelCorrelationID when " + "making a request/reply invocation using " + "a static replyTo Queue.";
165                                }
166                                correlationID = id;
167                            }
168                        }
169    
170                        if (correlationID != null) {
171                            reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID, correlationID);
172                        } else {
173                            // No message correlation id is set. Whatever comeback
174                            // will be accepted as responses.
175                            // We assume that it will only happen in case of the
176                            // temp. reply queue.
177                        }
178    
179                        getLogger().log(Level.FINE, "template sending request: ", reply.getIn());
180                    }
181                });
182            }
183    
184            private void handleResponse() throws IOException {
185                // REVISIT distinguish decoupled case or oneway call
186                Object response = null;
187    
188                // TODO if outMessage need to get the response
189                Message inMessage = new MessageImpl();
190                outMessage.getExchange().setInMessage(inMessage);
191                // set the message header back to the incomeMessage
192                // inMessage.put(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS,
193                // outMessage.get(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS));
194    
195                /*
196                 * Object result1; Object result = null; javax.camel.Message
197                 * camelMessage1 = pooledSession.consumer().receive(timeout);
198                 * getLogger().log(Level.FINE, "template received reply: " ,
199                 * camelMessage1); if (camelMessage1 != null) {
200                 * base.populateIncomingContext(camelMessage1, outMessage,
201                 * CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS); String messageType =
202                 * camelMessage1 instanceof TextMessage ?
203                 * CamelConstants.TEXT_MESSAGE_TYPE :
204                 * CamelConstants.BINARY_MESSAGE_TYPE; result =
205                 * base.unmarshal((org.apache.camel.Exchange) outMessage); result1 =
206                 * result; } else { String error = "CamelClientTransport.receive()
207                 * timed out. No message available."; getLogger().log(Level.SEVERE,
208                 * error); //TODO: Review what exception should we throw. throw new
209                 * CamelException(error); } response = result1; //set the message
210                 * header back to the incomeMessage
211                 * inMessage.put(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS,
212                 * outMessage.get(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS));
213                 */
214    
215                getLogger().log(Level.FINE, "The Response Message is : [" + response + "]");
216    
217                // setup the inMessage response stream
218                byte[] bytes = null;
219                if (response instanceof String) {
220                    String requestString = (String)response;
221                    bytes = requestString.getBytes();
222                } else {
223                    bytes = (byte[])response;
224                }
225                inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
226                getLogger().log(Level.FINE, "incoming observer is " + incomingObserver);
227                incomingObserver.onMessage(inMessage);
228            }
229        }
230    
231        private boolean isTextPayload() {
232            // TODO use runtime policy
233            return true;
234        }
235    
236        /**
237         * Represented decoupled response endpoint.
238         */
239        protected class DecoupledDestination implements Destination {
240            protected MessageObserver decoupledMessageObserver;
241            private EndpointReferenceType address;
242    
243            DecoupledDestination(EndpointReferenceType ref, MessageObserver incomingObserver) {
244                address = ref;
245                decoupledMessageObserver = incomingObserver;
246            }
247    
248            public EndpointReferenceType getAddress() {
249                return address;
250            }
251    
252            public Conduit getBackChannel(Message inMessage, Message partialResponse, EndpointReferenceType addr) throws IOException {
253                // shouldn't be called on decoupled endpoint
254                return null;
255            }
256    
257            public void shutdown() {
258                // TODO Auto-generated method stub
259            }
260    
261            public synchronized void setMessageObserver(MessageObserver observer) {
262                decoupledMessageObserver = observer;
263            }
264    
265            public synchronized MessageObserver getMessageObserver() {
266                return decoupledMessageObserver;
267            }
268        }
269    
270    }