001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.cxf.transport;
018    
019    import java.io.ByteArrayInputStream;
020    import java.io.IOException;
021    import java.io.InputStream;
022    import java.io.OutputStream;
023    import java.util.logging.Level;
024    import java.util.logging.Logger;
025    
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.Endpoint;
028    import org.apache.camel.Exchange;
029    import org.apache.camel.Processor;
030    import org.apache.camel.Producer;
031    import org.apache.cxf.Bus;
032    import org.apache.cxf.common.logging.LogUtils;
033    import org.apache.cxf.configuration.Configurable;
034    import org.apache.cxf.io.CachedOutputStream;
035    import org.apache.cxf.message.Message;
036    import org.apache.cxf.message.MessageImpl;
037    import org.apache.cxf.service.model.EndpointInfo;
038    import org.apache.cxf.transport.AbstractConduit;
039    import org.apache.cxf.transport.AbstractDestination;
040    import org.apache.cxf.transport.Conduit;
041    import org.apache.cxf.transport.ConduitInitiator;
042    import org.apache.cxf.transport.MessageObserver;
043    import org.apache.cxf.ws.addressing.EndpointReferenceType;
044    import org.apache.cxf.wsdl.EndpointReferenceUtils;
045    
046    /**
047     * @version $Revision: 563665 $
048     */
049    public class CamelDestination extends AbstractDestination implements Configurable {
050        protected static final String BASE_BEAN_NAME_SUFFIX = ".camel-destination-base";
051        private static final Logger LOG = LogUtils.getL7dLogger(CamelDestination.class);
052        CamelContext camelContext;
053        String camelUri;
054        final ConduitInitiator conduitInitiator;
055        private CamelTransportBase base;
056        private Endpoint endpoint;
057    
058        public CamelDestination(CamelContext camelContext, Bus bus, ConduitInitiator ci, EndpointInfo info) throws IOException {
059            super(getTargetReference(info, bus), info);
060            this.camelContext = camelContext;
061    
062            base = new CamelTransportBase(camelContext, bus, endpointInfo, true, BASE_BEAN_NAME_SUFFIX);
063    
064            conduitInitiator = ci;
065    
066            initConfig();
067        }
068    
069        protected Logger getLogger() {
070            return LOG;
071        }
072    
073        /**
074         * @param inMessage the incoming message
075         * @return the inbuilt backchannel
076         */
077        protected Conduit getInbuiltBackChannel(Message inMessage) {
078            return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(), inMessage);
079        }
080    
081        public void activate() {
082            getLogger().log(Level.INFO, "CamelDestination activate().... ");
083    
084            try {
085                getLogger().log(Level.FINE, "establishing Camel connection");
086                endpoint = camelContext.getEndpoint(camelUri);
087            } catch (Exception ex) {
088                getLogger().log(Level.SEVERE, "Camel connect failed with EException : ", ex);
089            }
090        }
091    
092        public void deactivate() {
093            base.close();
094        }
095    
096        public void shutdown() {
097            getLogger().log(Level.FINE, "CamelDestination shutdown()");
098            this.deactivate();
099        }
100    
101        protected void incoming(Exchange exchange) {
102            getLogger().log(Level.FINE, "server received request: ", exchange);
103    
104            byte[] bytes = base.unmarshal(exchange);
105    
106            // get the message to be interceptor
107            MessageImpl inMessage = new MessageImpl();
108            inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
109            base.populateIncomingContext(exchange, inMessage, CamelConstants.CAMEL_SERVER_REQUEST_HEADERS);
110            // inMessage.put(CamelConstants.CAMEL_SERVER_RESPONSE_HEADERS, new
111            // CamelMessageHeadersType());
112            inMessage.put(CamelConstants.CAMEL_REQUEST_MESSAGE, exchange);
113    
114            inMessage.setDestination(this);
115    
116            // handle the incoming message
117            incomingObserver.onMessage(inMessage);
118        }
119    
120        public String getBeanName() {
121            return endpointInfo.getName().toString() + ".camel-destination";
122        }
123    
124        private void initConfig() {
125            /*
126             * this.runtimePolicy = endpointInfo.getTraversedExtensor(new
127             * ServerBehaviorPolicyType(), ServerBehaviorPolicyType.class);
128             * this.serverConfig = endpointInfo.getTraversedExtensor(new
129             * ServerConfig(), ServerConfig.class); this.address =
130             * endpointInfo.getTraversedExtensor(new AddressType(),
131             * AddressType.class); this.sessionPool =
132             * endpointInfo.getTraversedExtensor(new SessionPoolType(),
133             * SessionPoolType.class);
134             */
135        }
136    
137        protected class ConsumerProcessor implements Processor {
138            public void process(Exchange exchange) {
139                try {
140                    incoming(exchange);
141                } catch (Throwable ex) {
142                    getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex);
143                }
144            }
145        }
146    
147        // this should deal with the cxf message
148        protected class BackChannelConduit extends AbstractConduit {
149            protected Message inMessage;
150    
151            BackChannelConduit(EndpointReferenceType ref, Message message) {
152                super(ref);
153                inMessage = message;
154            }
155    
156            /**
157             * Register a message observer for incoming messages.
158             * 
159             * @param observer the observer to notify on receipt of incoming
160             */
161            public void setMessageObserver(MessageObserver observer) {
162                // shouldn't be called for a back channel conduit
163            }
164    
165            /**
166             * Send an outbound message, assumed to contain all the name-value
167             * mappings of the corresponding input message (if any).
168             * 
169             * @param message the message to be sent.
170             */
171            public void prepare(Message message) throws IOException {
172                // setup the message to be send back
173                message.put(CamelConstants.CAMEL_REQUEST_MESSAGE, inMessage.get(CamelConstants.CAMEL_REQUEST_MESSAGE));
174                message.setContent(OutputStream.class, new CamelOutputStream(inMessage));
175            }
176    
177            protected Logger getLogger() {
178                return LOG;
179            }
180    
181        }
182    
183        private class CamelOutputStream extends CachedOutputStream {
184            private Message inMessage;
185            private Producer<Exchange> replyTo;
186            private Producer<Exchange> sender;
187    
188            // setup the ByteArrayStream
189            public CamelOutputStream(Message m) {
190                super();
191                inMessage = m;
192            }
193    
194            // prepair the message and get the send out message
195            private void commitOutputMessage() throws IOException {
196    
197                // setup the reply message
198                final String replyToUri = getReplyToDestination(inMessage);
199    
200                base.template.send(replyToUri, new Processor() {
201                    public void process(Exchange reply) {
202                        base.marshal(currentStream.toString(), replyToUri, reply);
203    
204                        setReplyCorrelationID(inMessage, reply);
205    
206                        base.setMessageProperties(inMessage, reply);
207    
208                        getLogger().log(Level.FINE, "just server sending reply: ", reply);
209                    }
210                });
211            }
212    
213            @Override
214            protected void doFlush() throws IOException {
215                // Do nothing here
216            }
217    
218            @Override
219            protected void doClose() throws IOException {
220                commitOutputMessage();
221            }
222    
223            @Override
224            protected void onWrite() throws IOException {
225                // Do nothing here
226            }
227        }
228    
229        protected String getReplyToDestination(Message inMessage) {
230            if (inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO) != null) {
231                return (String)inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO);
232            } else {
233                return base.getReplyDestination();
234            }
235        }
236    
237        protected void setReplyCorrelationID(Message inMessage, Exchange reply) {
238            Object value = inMessage.get(CamelConstants.CAMEL_CORRELATION_ID);
239            if (value != null) {
240                reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID, value);
241            }
242        }
243    }