001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.cxf;
018    
019    import java.io.IOException;
020    import java.util.concurrent.CountDownLatch;
021    
022    import org.apache.camel.Exchange;
023    import org.apache.camel.RuntimeCamelException;
024    import org.apache.camel.impl.DefaultProducer;
025    import org.apache.cxf.message.ExchangeImpl;
026    import org.apache.cxf.message.Message;
027    import org.apache.cxf.message.MessageImpl;
028    import org.apache.cxf.service.model.EndpointInfo;
029    import org.apache.cxf.transport.Conduit;
030    import org.apache.cxf.transport.Destination;
031    import org.apache.cxf.transport.MessageObserver;
032    import org.apache.cxf.transport.local.LocalConduit;
033    import org.apache.cxf.transport.local.LocalTransportFactory;
034    
035    /**
036     * Sends messages from Camel into the CXF endpoint
037     * 
038     * @version $Revision: 563665 $
039     */
040    public class CxfProducer extends DefaultProducer {
041        private CxfEndpoint endpoint;
042        private final LocalTransportFactory transportFactory;
043        private Destination destination;
044        private Conduit conduit;
045        private ResultFuture future = new ResultFuture();
046    
047        public CxfProducer(CxfEndpoint endpoint, LocalTransportFactory transportFactory) {
048            super(endpoint);
049            this.endpoint = endpoint;
050            this.transportFactory = transportFactory;
051        }
052    
053        public void process(Exchange exchange) {
054            CxfExchange cxfExchange = endpoint.toExchangeType(exchange);
055            process(cxfExchange);
056        }
057    
058        public void process(CxfExchange exchange) {
059            try {
060                CxfBinding binding = endpoint.getBinding();
061                MessageImpl m = binding.createCxfMessage(exchange);
062                ExchangeImpl e = new ExchangeImpl();
063                e.setInMessage(m);
064                m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
065                m.setDestination(destination);
066                synchronized (conduit) {
067                    conduit.prepare(m);
068    
069                    // now lets wait for the response
070                    if (endpoint.isInOut()) {
071                        Message response = future.getResponse();
072    
073                        // TODO - why do we need to ignore the returned message and
074                        // get the out message from the exchange!
075                        response = e.getOutMessage();
076                        binding.storeCxfResponse(exchange, response);
077                    }
078                }
079            } catch (IOException e) {
080                throw new RuntimeCamelException(e);
081            }
082        }
083    
084        @Override
085        protected void doStart() throws Exception {
086            super.doStart();
087            EndpointInfo endpointInfo = endpoint.getEndpointInfo();
088            destination = transportFactory.getDestination(endpointInfo);
089    
090            // Set up a listener for the response
091            conduit = transportFactory.getConduit(endpointInfo);
092            conduit.setMessageObserver(future);
093        }
094    
095        @Override
096        protected void doStop() throws Exception {
097            super.doStop();
098    
099            if (conduit != null) {
100                conduit.close();
101            }
102        }
103    
104        protected class ResultFuture implements MessageObserver {
105            Message response;
106            CountDownLatch latch = new CountDownLatch(1);
107    
108            public Message getResponse() {
109                while (response == null) {
110                    try {
111                        latch.await();
112                    } catch (InterruptedException e) {
113                        // ignore
114                    }
115                }
116                return response;
117            }
118    
119            public synchronized void onMessage(Message message) {
120                try {
121                    message.remove(LocalConduit.DIRECT_DISPATCH);
122                    this.response = message;
123                } finally {
124                    latch.countDown();
125                }
126            }
127        }
128    }