001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.cxf.transport; 018 019 import java.io.ByteArrayInputStream; 020 import java.io.IOException; 021 import java.io.InputStream; 022 import java.io.OutputStream; 023 import java.util.logging.Level; 024 import java.util.logging.Logger; 025 026 import org.apache.camel.CamelContext; 027 import org.apache.camel.Endpoint; 028 import org.apache.camel.Exchange; 029 import org.apache.camel.Processor; 030 import org.apache.camel.Producer; 031 import org.apache.cxf.Bus; 032 import org.apache.cxf.common.logging.LogUtils; 033 import org.apache.cxf.configuration.Configurable; 034 import org.apache.cxf.io.CachedOutputStream; 035 import org.apache.cxf.message.Message; 036 import org.apache.cxf.message.MessageImpl; 037 import org.apache.cxf.service.model.EndpointInfo; 038 import org.apache.cxf.transport.AbstractConduit; 039 import org.apache.cxf.transport.AbstractDestination; 040 import org.apache.cxf.transport.Conduit; 041 import org.apache.cxf.transport.ConduitInitiator; 042 import org.apache.cxf.transport.MessageObserver; 043 import org.apache.cxf.ws.addressing.EndpointReferenceType; 044 import org.apache.cxf.wsdl.EndpointReferenceUtils; 045 046 /** 047 * @version $Revision: 563665 $ 048 */ 049 public class CamelDestination extends AbstractDestination implements Configurable { 050 protected static final String BASE_BEAN_NAME_SUFFIX = ".camel-destination-base"; 051 private static final Logger LOG = LogUtils.getL7dLogger(CamelDestination.class); 052 CamelContext camelContext; 053 String camelUri; 054 final ConduitInitiator conduitInitiator; 055 private CamelTransportBase base; 056 private Endpoint endpoint; 057 058 public CamelDestination(CamelContext camelContext, Bus bus, ConduitInitiator ci, EndpointInfo info) throws IOException { 059 super(getTargetReference(info, bus), info); 060 this.camelContext = camelContext; 061 062 base = new CamelTransportBase(camelContext, bus, endpointInfo, true, BASE_BEAN_NAME_SUFFIX); 063 064 conduitInitiator = ci; 065 066 initConfig(); 067 } 068 069 protected Logger getLogger() { 070 return LOG; 071 } 072 073 /** 074 * @param inMessage the incoming message 075 * @return the inbuilt backchannel 076 */ 077 protected Conduit getInbuiltBackChannel(Message inMessage) { 078 return new BackChannelConduit(EndpointReferenceUtils.getAnonymousEndpointReference(), inMessage); 079 } 080 081 public void activate() { 082 getLogger().log(Level.INFO, "CamelDestination activate().... "); 083 084 try { 085 getLogger().log(Level.FINE, "establishing Camel connection"); 086 endpoint = camelContext.getEndpoint(camelUri); 087 } catch (Exception ex) { 088 getLogger().log(Level.SEVERE, "Camel connect failed with EException : ", ex); 089 } 090 } 091 092 public void deactivate() { 093 base.close(); 094 } 095 096 public void shutdown() { 097 getLogger().log(Level.FINE, "CamelDestination shutdown()"); 098 this.deactivate(); 099 } 100 101 protected void incoming(Exchange exchange) { 102 getLogger().log(Level.FINE, "server received request: ", exchange); 103 104 byte[] bytes = base.unmarshal(exchange); 105 106 // get the message to be interceptor 107 MessageImpl inMessage = new MessageImpl(); 108 inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes)); 109 base.populateIncomingContext(exchange, inMessage, CamelConstants.CAMEL_SERVER_REQUEST_HEADERS); 110 // inMessage.put(CamelConstants.CAMEL_SERVER_RESPONSE_HEADERS, new 111 // CamelMessageHeadersType()); 112 inMessage.put(CamelConstants.CAMEL_REQUEST_MESSAGE, exchange); 113 114 inMessage.setDestination(this); 115 116 // handle the incoming message 117 incomingObserver.onMessage(inMessage); 118 } 119 120 public String getBeanName() { 121 return endpointInfo.getName().toString() + ".camel-destination"; 122 } 123 124 private void initConfig() { 125 /* 126 * this.runtimePolicy = endpointInfo.getTraversedExtensor(new 127 * ServerBehaviorPolicyType(), ServerBehaviorPolicyType.class); 128 * this.serverConfig = endpointInfo.getTraversedExtensor(new 129 * ServerConfig(), ServerConfig.class); this.address = 130 * endpointInfo.getTraversedExtensor(new AddressType(), 131 * AddressType.class); this.sessionPool = 132 * endpointInfo.getTraversedExtensor(new SessionPoolType(), 133 * SessionPoolType.class); 134 */ 135 } 136 137 protected class ConsumerProcessor implements Processor { 138 public void process(Exchange exchange) { 139 try { 140 incoming(exchange); 141 } catch (Throwable ex) { 142 getLogger().log(Level.WARNING, "Failed to process incoming message : ", ex); 143 } 144 } 145 } 146 147 // this should deal with the cxf message 148 protected class BackChannelConduit extends AbstractConduit { 149 protected Message inMessage; 150 151 BackChannelConduit(EndpointReferenceType ref, Message message) { 152 super(ref); 153 inMessage = message; 154 } 155 156 /** 157 * Register a message observer for incoming messages. 158 * 159 * @param observer the observer to notify on receipt of incoming 160 */ 161 public void setMessageObserver(MessageObserver observer) { 162 // shouldn't be called for a back channel conduit 163 } 164 165 /** 166 * Send an outbound message, assumed to contain all the name-value 167 * mappings of the corresponding input message (if any). 168 * 169 * @param message the message to be sent. 170 */ 171 public void prepare(Message message) throws IOException { 172 // setup the message to be send back 173 message.put(CamelConstants.CAMEL_REQUEST_MESSAGE, inMessage.get(CamelConstants.CAMEL_REQUEST_MESSAGE)); 174 message.setContent(OutputStream.class, new CamelOutputStream(inMessage)); 175 } 176 177 protected Logger getLogger() { 178 return LOG; 179 } 180 181 } 182 183 private class CamelOutputStream extends CachedOutputStream { 184 private Message inMessage; 185 private Producer<Exchange> replyTo; 186 private Producer<Exchange> sender; 187 188 // setup the ByteArrayStream 189 public CamelOutputStream(Message m) { 190 super(); 191 inMessage = m; 192 } 193 194 // prepair the message and get the send out message 195 private void commitOutputMessage() throws IOException { 196 197 // setup the reply message 198 final String replyToUri = getReplyToDestination(inMessage); 199 200 base.template.send(replyToUri, new Processor() { 201 public void process(Exchange reply) { 202 base.marshal(currentStream.toString(), replyToUri, reply); 203 204 setReplyCorrelationID(inMessage, reply); 205 206 base.setMessageProperties(inMessage, reply); 207 208 getLogger().log(Level.FINE, "just server sending reply: ", reply); 209 } 210 }); 211 } 212 213 @Override 214 protected void doFlush() throws IOException { 215 // Do nothing here 216 } 217 218 @Override 219 protected void doClose() throws IOException { 220 commitOutputMessage(); 221 } 222 223 @Override 224 protected void onWrite() throws IOException { 225 // Do nothing here 226 } 227 } 228 229 protected String getReplyToDestination(Message inMessage) { 230 if (inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO) != null) { 231 return (String)inMessage.get(CamelConstants.CAMEL_REBASED_REPLY_TO); 232 } else { 233 return base.getReplyDestination(); 234 } 235 } 236 237 protected void setReplyCorrelationID(Message inMessage, Exchange reply) { 238 Object value = inMessage.get(CamelConstants.CAMEL_CORRELATION_ID); 239 if (value != null) { 240 reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID, value); 241 } 242 } 243 }