001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.cxf; 018 019 import java.io.IOException; 020 import java.util.concurrent.CountDownLatch; 021 022 import org.apache.camel.Exchange; 023 import org.apache.camel.RuntimeCamelException; 024 import org.apache.camel.impl.DefaultProducer; 025 import org.apache.cxf.message.ExchangeImpl; 026 import org.apache.cxf.message.Message; 027 import org.apache.cxf.message.MessageImpl; 028 import org.apache.cxf.service.model.EndpointInfo; 029 import org.apache.cxf.transport.Conduit; 030 import org.apache.cxf.transport.Destination; 031 import org.apache.cxf.transport.MessageObserver; 032 import org.apache.cxf.transport.local.LocalConduit; 033 import org.apache.cxf.transport.local.LocalTransportFactory; 034 035 /** 036 * Sends messages from Camel into the CXF endpoint 037 * 038 * @version $Revision: 563665 $ 039 */ 040 public class CxfProducer extends DefaultProducer { 041 private CxfEndpoint endpoint; 042 private final LocalTransportFactory transportFactory; 043 private Destination destination; 044 private Conduit conduit; 045 private ResultFuture future = new ResultFuture(); 046 047 public CxfProducer(CxfEndpoint endpoint, LocalTransportFactory transportFactory) { 048 super(endpoint); 049 this.endpoint = endpoint; 050 this.transportFactory = transportFactory; 051 } 052 053 public void process(Exchange exchange) { 054 CxfExchange cxfExchange = endpoint.toExchangeType(exchange); 055 process(cxfExchange); 056 } 057 058 public void process(CxfExchange exchange) { 059 try { 060 CxfBinding binding = endpoint.getBinding(); 061 MessageImpl m = binding.createCxfMessage(exchange); 062 ExchangeImpl e = new ExchangeImpl(); 063 e.setInMessage(m); 064 m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE); 065 m.setDestination(destination); 066 synchronized (conduit) { 067 conduit.prepare(m); 068 069 // now lets wait for the response 070 if (endpoint.isInOut()) { 071 Message response = future.getResponse(); 072 073 // TODO - why do we need to ignore the returned message and 074 // get the out message from the exchange! 075 response = e.getOutMessage(); 076 binding.storeCxfResponse(exchange, response); 077 } 078 } 079 } catch (IOException e) { 080 throw new RuntimeCamelException(e); 081 } 082 } 083 084 @Override 085 protected void doStart() throws Exception { 086 super.doStart(); 087 EndpointInfo endpointInfo = endpoint.getEndpointInfo(); 088 destination = transportFactory.getDestination(endpointInfo); 089 090 // Set up a listener for the response 091 conduit = transportFactory.getConduit(endpointInfo); 092 conduit.setMessageObserver(future); 093 } 094 095 @Override 096 protected void doStop() throws Exception { 097 super.doStop(); 098 099 if (conduit != null) { 100 conduit.close(); 101 } 102 } 103 104 protected class ResultFuture implements MessageObserver { 105 Message response; 106 CountDownLatch latch = new CountDownLatch(1); 107 108 public Message getResponse() { 109 while (response == null) { 110 try { 111 latch.await(); 112 } catch (InterruptedException e) { 113 // ignore 114 } 115 } 116 return response; 117 } 118 119 public synchronized void onMessage(Message message) { 120 try { 121 message.remove(LocalConduit.DIRECT_DISPATCH); 122 this.response = message; 123 } finally { 124 latch.countDown(); 125 } 126 } 127 } 128 }