001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.component.cxf.transport;
019    
020    import org.apache.camel.CamelContext;
021    import org.apache.camel.Endpoint;
022    import org.apache.camel.Exchange;
023    import org.apache.camel.Processor;
024    import org.apache.camel.Producer;
025    import org.apache.cxf.Bus;
026    import org.apache.cxf.common.logging.LogUtils;
027    import org.apache.cxf.configuration.Configurable;
028    import org.apache.cxf.io.AbstractCachedOutputStream;
029    import org.apache.cxf.message.Message;
030    import org.apache.cxf.message.MessageImpl;
031    import org.apache.cxf.service.model.EndpointInfo;
032    import org.apache.cxf.transport.AbstractConduit;
033    import org.apache.cxf.transport.AbstractDestination;
034    import org.apache.cxf.transport.Conduit;
035    import org.apache.cxf.transport.ConduitInitiator;
036    import org.apache.cxf.transport.MessageObserver;
037    import org.apache.cxf.ws.addressing.EndpointReferenceType;
038    import org.apache.cxf.wsdl.EndpointReferenceUtils;
039    
040    import java.io.ByteArrayInputStream;
041    import java.io.IOException;
042    import java.io.InputStream;
043    import java.io.OutputStream;
044    import java.util.logging.Level;
045    import java.util.logging.Logger;
046    
047    /**
048     * @version $Revision: 535478 $
049     */
050    public class CamelDestination extends AbstractDestination implements Configurable {
051        protected static final String BASE_BEAN_NAME_SUFFIX = ".camel-destination-base";
052        private static final Logger LOG = LogUtils.getL7dLogger(CamelDestination.class);
053        CamelContext camelContext;
054        String camelUri;
055        final ConduitInitiator conduitInitiator;
056        private CamelTransportBase base;
057        private Endpoint endpoint;
058    
059        public CamelDestination(CamelContext camelContext, Bus bus,
060                ConduitInitiator ci,
061                EndpointInfo info) throws IOException {
062            super(getTargetReference(info, bus), info);
063            this.camelContext = camelContext;
064    
065            base = new CamelTransportBase(camelContext, bus, endpointInfo, true, BASE_BEAN_NAME_SUFFIX);
066    
067            conduitInitiator = ci;
068    
069            initConfig();
070        }
071    
072        protected Logger getLogger() {
073            return LOG;
074        }
075    
076        /**
077         * @param inMessage the incoming message
078         * @return the inbuilt backchannel
079         */
080        protected Conduit getInbuiltBackChannel(Message inMessage) {
081            return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(),
082                    inMessage);
083        }
084    
085        public void activate() {
086            getLogger().log(Level.INFO, "CamelDestination activate().... ");
087    
088            try {
089                getLogger().log(Level.FINE, "establishing Camel connection");
090                endpoint = camelContext.getEndpoint(camelUri);
091            }
092            catch (Exception ex) {
093                getLogger().log(Level.SEVERE, "Camel connect failed with EException : ", ex);
094            }
095        }
096    
097        public void deactivate() {
098            base.close();
099        }
100    
101        public void shutdown() {
102            getLogger().log(Level.FINE, "CamelDestination shutdown()");
103            this.deactivate();
104        }
105    
106        protected void incoming(Exchange exchange) {
107            getLogger().log(Level.FINE, "server received request: ", exchange);
108    
109            byte[] bytes = base.unmarshal(exchange);
110    
111            // get the message to be interceptor
112            MessageImpl inMessage = new MessageImpl();
113            inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
114            base.populateIncomingContext(exchange, inMessage, CamelConstants.CAMEL_SERVER_REQUEST_HEADERS);
115            //inMessage.put(CamelConstants.CAMEL_SERVER_RESPONSE_HEADERS, new CamelMessageHeadersType());
116            inMessage.put(CamelConstants.CAMEL_REQUEST_MESSAGE, exchange);
117    
118            inMessage.setDestination(this);
119    
120            //handle the incoming message
121            incomingObserver.onMessage(inMessage);
122        }
123    
124        public String getBeanName() {
125            return endpointInfo.getName().toString() + ".camel-destination";
126        }
127    
128        private void initConfig() {
129    /*
130            this.runtimePolicy = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(),
131                                                                   ServerBehaviorPolicyType.class);
132            this.serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class);
133            this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class);
134            this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class);
135    */
136        }
137    
138        protected class ConsumerProcessor implements Processor {
139            public void process(Exchange exchange) {
140                try {
141                    incoming(exchange);
142                }
143                catch (Throwable ex) {
144                    getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex);
145                }
146            }
147        }
148    
149        // this should deal with the cxf message
150        protected class BackChannelConduit extends AbstractConduit {
151            protected Message inMessage;
152    
153            BackChannelConduit(EndpointReferenceType ref, Message message) {
154                super(ref);
155                inMessage = message;
156            }
157    
158            /**
159             * Register a message observer for incoming messages.
160             *
161             * @param observer the observer to notify on receipt of incoming
162             */
163            public void setMessageObserver(MessageObserver observer) {
164                // shouldn't be called for a back channel conduit
165            }
166    
167            /**
168             * Send an outbound message, assumed to contain all the name-value
169             * mappings of the corresponding input message (if any).
170             *
171             * @param message the message to be sent.
172             */
173            public void prepare(Message message) throws IOException {
174                // setup the message to be send back
175                message.put(CamelConstants.CAMEL_REQUEST_MESSAGE,
176                        inMessage.get(CamelConstants.CAMEL_REQUEST_MESSAGE));
177                message.setContent(OutputStream.class,
178                        new CamelOutputStream(inMessage));
179            }
180    
181            protected Logger getLogger() {
182                return LOG;
183            }
184    
185        }
186    
187        private class CamelOutputStream extends AbstractCachedOutputStream {
188            private Message inMessage;
189            private Producer<Exchange> replyTo;
190            private Producer<Exchange> sender;
191    
192            // setup the ByteArrayStream
193            public CamelOutputStream(Message m) {
194                super();
195                inMessage = m;
196            }
197    
198            // prepair the message and get the send out message
199            private void commitOutputMessage() throws IOException {
200    
201                //setup the reply message
202                final String replyToUri = getReplyToDestination(inMessage);
203    
204                base.template.send(replyToUri, new Processor() {
205                    public void process(Exchange reply) {
206                        base.marshal(currentStream.toString(), replyToUri, reply);
207    
208                        setReplyCorrelationID(inMessage, reply);
209    
210                        base.setMessageProperties(inMessage, reply);
211    
212                        getLogger().log(Level.FINE, "just server sending reply: ", reply);
213                    }
214                });
215            }
216    
217            @Override
218            protected void doFlush() throws IOException {
219                // Do nothing here
220            }
221    
222            @Override
223            protected void doClose() throws IOException {
224                commitOutputMessage();
225            }
226    
227            @Override
228            protected void onWrite() throws IOException {
229                // Do nothing here
230            }
231        }
232    
233        protected String getReplyToDestination(Message inMessage) {
234            if (inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO) != null) {
235                return (String) inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO);
236            }
237            else {
238                return base.getReplyDestination();
239            }
240        }
241    
242        protected void setReplyCorrelationID(Message inMessage, Exchange reply) {
243            Object value = inMessage.get(CamelConstants.CAMEL_CORRELATION_ID);
244            if (value != null) {
245                reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID, value);
246            }
247        }
248    }