001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.cxf.transport; 018 019 import java.io.ByteArrayInputStream; 020 import java.io.ByteArrayOutputStream; 021 import java.io.IOException; 022 import java.io.InputStream; 023 import java.io.OutputStream; 024 import java.util.logging.Level; 025 import java.util.logging.Logger; 026 027 import org.apache.camel.CamelContext; 028 import org.apache.camel.Processor; 029 import org.apache.cxf.Bus; 030 import org.apache.cxf.common.logging.LogUtils; 031 import org.apache.cxf.configuration.Configurable; 032 import org.apache.cxf.configuration.Configurer; 033 import org.apache.cxf.io.CachedOutputStream; 034 import org.apache.cxf.message.Message; 035 import org.apache.cxf.message.MessageImpl; 036 import org.apache.cxf.service.model.EndpointInfo; 037 import org.apache.cxf.transport.AbstractConduit; 038 import org.apache.cxf.transport.Conduit; 039 import org.apache.cxf.transport.Destination; 040 import org.apache.cxf.transport.MessageObserver; 041 import org.apache.cxf.ws.addressing.AttributedURIType; 042 import org.apache.cxf.ws.addressing.EndpointReferenceType; 043 044 /** 045 * @version $Revision: 563665 $ 046 */ 047 public class CamelConduit extends AbstractConduit implements Configurable { 048 protected static final String BASE_BEAN_NAME_SUFFIX = ".camel-conduit-base"; 049 private static final Logger LOG = LogUtils.getL7dLogger(CamelConduit.class); 050 private final CamelTransportBase base; 051 private String targetCamelEndpointUri; 052 053 /* 054 * protected ClientConfig clientConfig; protected ClientBehaviorPolicyType 055 * runtimePolicy; protected AddressType address; protected SessionPoolType 056 * sessionPool; 057 */ 058 059 public CamelConduit(CamelContext camelContext, Bus bus, EndpointInfo endpointInfo, EndpointReferenceType targetReference) { 060 super(targetReference); 061 AttributedURIType address = targetReference.getAddress(); 062 if (address != null) { 063 this.targetCamelEndpointUri = address.getValue(); 064 } 065 066 base = new CamelTransportBase(camelContext, bus, endpointInfo, false, BASE_BEAN_NAME_SUFFIX); 067 068 initConfig(); 069 } 070 071 // prepare the message for send out , not actually send out the message 072 public void prepare(Message message) throws IOException { 073 getLogger().log(Level.FINE, "CamelConduit send message"); 074 075 message.setContent(OutputStream.class, new CamelOutputStream(message)); 076 } 077 078 public void close() { 079 getLogger().log(Level.FINE, "CamelConduit closed "); 080 081 // ensure resources held by session factory are released 082 // 083 base.close(); 084 } 085 086 protected Logger getLogger() { 087 return LOG; 088 } 089 090 public String getBeanName() { 091 EndpointInfo info = base.endpointInfo; 092 if (info == null) { 093 return "default.camel-conduit"; 094 } 095 return info.getName() + ".camel-conduit"; 096 } 097 098 private void initConfig() { 099 100 /* 101 * this.address = base.endpointInfo.getTraversedExtensor(new 102 * AddressType(), AddressType.class); this.sessionPool = 103 * base.endpointInfo.getTraversedExtensor(new SessionPoolType(), 104 * SessionPoolType.class); this.clientConfig = 105 * base.endpointInfo.getTraversedExtensor(new ClientConfig(), 106 * ClientConfig.class); this.runtimePolicy = 107 * base.endpointInfo.getTraversedExtensor(new 108 * ClientBehaviorPolicyType(), ClientBehaviorPolicyType.class); 109 */ 110 111 Configurer configurer = base.bus.getExtension(Configurer.class); 112 if (null != configurer) { 113 configurer.configureBean(this); 114 } 115 } 116 117 private class CamelOutputStream extends CachedOutputStream { 118 private Message outMessage; 119 private boolean isOneWay; 120 121 public CamelOutputStream(Message m) { 122 outMessage = m; 123 } 124 125 protected void doFlush() throws IOException { 126 // do nothing here 127 } 128 129 protected void doClose() throws IOException { 130 isOneWay = outMessage.getExchange().isOneWay(); 131 commitOutputMessage(); 132 if (!isOneWay) { 133 handleResponse(); 134 } 135 } 136 137 protected void onWrite() throws IOException { 138 139 } 140 141 private void commitOutputMessage() { 142 base.template.send(targetCamelEndpointUri, new Processor() { 143 public void process(org.apache.camel.Exchange reply) { 144 Object request = null; 145 if (isTextPayload()) { 146 request = currentStream.toString(); 147 } else { 148 request = ((ByteArrayOutputStream)currentStream).toByteArray(); 149 } 150 151 getLogger().log(Level.FINE, "Conduit Request is :[" + request + "]"); 152 String replyTo = base.getReplyDestination(); 153 // TODO setting up the responseExpected 154 base.marshal(request, replyTo, reply); 155 base.setMessageProperties(outMessage, reply); 156 157 String correlationID = null; 158 if (!isOneWay) { 159 // TODO create a correlationID 160 String id = null; 161 162 if (id != null) { 163 if (correlationID != null) { 164 String error = "User cannot set CamelCorrelationID when " + "making a request/reply invocation using " + "a static replyTo Queue."; 165 } 166 correlationID = id; 167 } 168 } 169 170 if (correlationID != null) { 171 reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID, correlationID); 172 } else { 173 // No message correlation id is set. Whatever comeback 174 // will be accepted as responses. 175 // We assume that it will only happen in case of the 176 // temp. reply queue. 177 } 178 179 getLogger().log(Level.FINE, "template sending request: ", reply.getIn()); 180 } 181 }); 182 } 183 184 private void handleResponse() throws IOException { 185 // REVISIT distinguish decoupled case or oneway call 186 Object response = null; 187 188 // TODO if outMessage need to get the response 189 Message inMessage = new MessageImpl(); 190 outMessage.getExchange().setInMessage(inMessage); 191 // set the message header back to the incomeMessage 192 // inMessage.put(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS, 193 // outMessage.get(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS)); 194 195 /* 196 * Object result1; Object result = null; javax.camel.Message 197 * camelMessage1 = pooledSession.consumer().receive(timeout); 198 * getLogger().log(Level.FINE, "template received reply: " , 199 * camelMessage1); if (camelMessage1 != null) { 200 * base.populateIncomingContext(camelMessage1, outMessage, 201 * CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS); String messageType = 202 * camelMessage1 instanceof TextMessage ? 203 * CamelConstants.TEXT_MESSAGE_TYPE : 204 * CamelConstants.BINARY_MESSAGE_TYPE; result = 205 * base.unmarshal((org.apache.camel.Exchange) outMessage); result1 = 206 * result; } else { String error = "CamelClientTransport.receive() 207 * timed out. No message available."; getLogger().log(Level.SEVERE, 208 * error); //TODO: Review what exception should we throw. throw new 209 * CamelException(error); } response = result1; //set the message 210 * header back to the incomeMessage 211 * inMessage.put(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS, 212 * outMessage.get(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS)); 213 */ 214 215 getLogger().log(Level.FINE, "The Response Message is : [" + response + "]"); 216 217 // setup the inMessage response stream 218 byte[] bytes = null; 219 if (response instanceof String) { 220 String requestString = (String)response; 221 bytes = requestString.getBytes(); 222 } else { 223 bytes = (byte[])response; 224 } 225 inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes)); 226 getLogger().log(Level.FINE, "incoming observer is " + incomingObserver); 227 incomingObserver.onMessage(inMessage); 228 } 229 } 230 231 private boolean isTextPayload() { 232 // TODO use runtime policy 233 return true; 234 } 235 236 /** 237 * Represented decoupled response endpoint. 238 */ 239 protected class DecoupledDestination implements Destination { 240 protected MessageObserver decoupledMessageObserver; 241 private EndpointReferenceType address; 242 243 DecoupledDestination(EndpointReferenceType ref, MessageObserver incomingObserver) { 244 address = ref; 245 decoupledMessageObserver = incomingObserver; 246 } 247 248 public EndpointReferenceType getAddress() { 249 return address; 250 } 251 252 public Conduit getBackChannel(Message inMessage, Message partialResponse, EndpointReferenceType addr) throws IOException { 253 // shouldn't be called on decoupled endpoint 254 return null; 255 } 256 257 public void shutdown() { 258 // TODO Auto-generated method stub 259 } 260 261 public synchronized void setMessageObserver(MessageObserver observer) { 262 decoupledMessageObserver = observer; 263 } 264 265 public synchronized MessageObserver getMessageObserver() { 266 return decoupledMessageObserver; 267 } 268 } 269 270 }