001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.component.cxf; 019 020 import org.apache.camel.RuntimeCamelException; 021 import org.apache.camel.Exchange; 022 import org.apache.camel.impl.DefaultProducer; 023 import org.apache.cxf.message.ExchangeImpl; 024 import org.apache.cxf.message.Message; 025 import org.apache.cxf.message.MessageImpl; 026 import org.apache.cxf.service.model.EndpointInfo; 027 import org.apache.cxf.transport.Conduit; 028 import org.apache.cxf.transport.Destination; 029 import org.apache.cxf.transport.MessageObserver; 030 import org.apache.cxf.transport.local.LocalConduit; 031 import org.apache.cxf.transport.local.LocalTransportFactory; 032 033 import java.io.IOException; 034 import java.util.concurrent.CountDownLatch; 035 036 /** 037 * Sends messages from Camel into the CXF endpoint 038 * 039 * @version $Revision: 534145 $ 040 */ 041 public class CxfProducer extends DefaultProducer { 042 private CxfEndpoint endpoint; 043 private final LocalTransportFactory transportFactory; 044 private Destination destination; 045 private Conduit conduit; 046 private ResultFuture future = new ResultFuture(); 047 048 public CxfProducer(CxfEndpoint endpoint, LocalTransportFactory transportFactory) { 049 super(endpoint); 050 this.endpoint = endpoint; 051 this.transportFactory = transportFactory; 052 } 053 054 public void process(Exchange exchange) { 055 CxfExchange cxfExchange = endpoint.toExchangeType(exchange); 056 process(cxfExchange); 057 } 058 059 public void process(CxfExchange exchange) { 060 try { 061 CxfBinding binding = endpoint.getBinding(); 062 MessageImpl m = binding.createCxfMessage(exchange); 063 ExchangeImpl e = new ExchangeImpl(); 064 e.setInMessage(m); 065 m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE); 066 m.setDestination(destination); 067 synchronized (conduit) { 068 conduit.prepare(m); 069 070 // now lets wait for the response 071 if (endpoint.isInOut()) { 072 Message response = future.getResponse(); 073 074 // TODO - why do we need to ignore the returned message and get the out message from the exchange! 075 response = e.getOutMessage(); 076 binding.storeCxfResponse(exchange, response); 077 } 078 } 079 } 080 catch (IOException e) { 081 throw new RuntimeCamelException(e); 082 } 083 } 084 085 @Override 086 protected void doStart() throws Exception { 087 super.doStart(); 088 EndpointInfo endpointInfo = endpoint.getEndpointInfo(); 089 destination = transportFactory.getDestination(endpointInfo); 090 091 // Set up a listener for the response 092 conduit = transportFactory.getConduit(endpointInfo); 093 conduit.setMessageObserver(future); 094 } 095 096 @Override 097 protected void doStop() throws Exception { 098 super.doStop(); 099 100 if (conduit != null) { 101 conduit.close(); 102 } 103 } 104 105 protected class ResultFuture implements MessageObserver { 106 Message response; 107 CountDownLatch latch = new CountDownLatch(1); 108 109 public Message getResponse() { 110 while (response == null) { 111 try { 112 latch.await(); 113 } 114 catch (InterruptedException e) { 115 // ignore 116 } 117 } 118 return response; 119 } 120 121 public synchronized void onMessage(Message message) { 122 try { 123 message.remove(LocalConduit.DIRECT_DISPATCH); 124 this.response = message; 125 } 126 finally { 127 latch.countDown(); 128 } 129 } 130 } 131 }