Coverage Report - org.apache.camel.component.cxf.transport.CamelConduit
 
Classes in this File Line Coverage Branch Coverage Complexity
CamelConduit
0% 
0% 
1.55
 
 1  
 /**
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  *
 9  
  *      http://www.apache.org/licenses/LICENSE-2.0
 10  
  *
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 package org.apache.camel.component.cxf.transport;
 18  
 
 19  
 import java.io.ByteArrayInputStream;
 20  
 import java.io.ByteArrayOutputStream;
 21  
 import java.io.IOException;
 22  
 import java.io.InputStream;
 23  
 import java.io.OutputStream;
 24  
 import java.util.logging.Level;
 25  
 import java.util.logging.Logger;
 26  
 
 27  
 import org.apache.camel.CamelContext;
 28  
 import org.apache.camel.Processor;
 29  
 import org.apache.cxf.Bus;
 30  
 import org.apache.cxf.common.logging.LogUtils;
 31  
 import org.apache.cxf.configuration.Configurable;
 32  
 import org.apache.cxf.configuration.Configurer;
 33  
 import org.apache.cxf.io.CachedOutputStream;
 34  
 import org.apache.cxf.message.Message;
 35  
 import org.apache.cxf.message.MessageImpl;
 36  
 import org.apache.cxf.service.model.EndpointInfo;
 37  
 import org.apache.cxf.transport.AbstractConduit;
 38  
 import org.apache.cxf.transport.Conduit;
 39  
 import org.apache.cxf.transport.Destination;
 40  
 import org.apache.cxf.transport.MessageObserver;
 41  
 import org.apache.cxf.ws.addressing.AttributedURIType;
 42  
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 43  
 
 44  
 /**
 45  
  * @version $Revision: 563665 $
 46  
  */
 47  0
 public class CamelConduit extends AbstractConduit implements Configurable {
 48  
     protected static final String BASE_BEAN_NAME_SUFFIX = ".camel-conduit-base";
 49  0
     private static final Logger LOG = LogUtils.getL7dLogger(CamelConduit.class);
 50  
     private final CamelTransportBase base;
 51  
     private String targetCamelEndpointUri;
 52  
 
 53  
     /*
 54  
      * protected ClientConfig clientConfig; protected ClientBehaviorPolicyType
 55  
      * runtimePolicy; protected AddressType address; protected SessionPoolType
 56  
      * sessionPool;
 57  
      */
 58  
 
 59  
     public CamelConduit(CamelContext camelContext, Bus bus, EndpointInfo endpointInfo, EndpointReferenceType targetReference) {
 60  0
         super(targetReference);
 61  0
         AttributedURIType address = targetReference.getAddress();
 62  0
         if (address != null) {
 63  0
             this.targetCamelEndpointUri = address.getValue();
 64  
         }
 65  
 
 66  0
         base = new CamelTransportBase(camelContext, bus, endpointInfo, false, BASE_BEAN_NAME_SUFFIX);
 67  
 
 68  0
         initConfig();
 69  0
     }
 70  
 
 71  
     // prepare the message for send out , not actually send out the message
 72  
     public void prepare(Message message) throws IOException {
 73  0
         getLogger().log(Level.FINE, "CamelConduit send message");
 74  
 
 75  0
         message.setContent(OutputStream.class, new CamelOutputStream(message));
 76  0
     }
 77  
 
 78  
     public void close() {
 79  0
         getLogger().log(Level.FINE, "CamelConduit closed ");
 80  
 
 81  
         // ensure resources held by session factory are released
 82  
         //
 83  0
         base.close();
 84  0
     }
 85  
 
 86  
     protected Logger getLogger() {
 87  0
         return LOG;
 88  
     }
 89  
 
 90  
     public String getBeanName() {
 91  0
         EndpointInfo info = base.endpointInfo;
 92  0
         if (info == null) {
 93  0
             return "default.camel-conduit";
 94  
         }
 95  0
         return info.getName() + ".camel-conduit";
 96  
     }
 97  
 
 98  
     private void initConfig() {
 99  
 
 100  
         /*
 101  
          * this.address = base.endpointInfo.getTraversedExtensor(new
 102  
          * AddressType(), AddressType.class); this.sessionPool =
 103  
          * base.endpointInfo.getTraversedExtensor(new SessionPoolType(),
 104  
          * SessionPoolType.class); this.clientConfig =
 105  
          * base.endpointInfo.getTraversedExtensor(new ClientConfig(),
 106  
          * ClientConfig.class); this.runtimePolicy =
 107  
          * base.endpointInfo.getTraversedExtensor(new
 108  
          * ClientBehaviorPolicyType(), ClientBehaviorPolicyType.class);
 109  
          */
 110  
 
 111  0
         Configurer configurer = base.bus.getExtension(Configurer.class);
 112  0
         if (null != configurer) {
 113  0
             configurer.configureBean(this);
 114  
         }
 115  0
     }
 116  
 
 117  0
     private class CamelOutputStream extends CachedOutputStream {
 118  
         private Message outMessage;
 119  
         private boolean isOneWay;
 120  
 
 121  0
         public CamelOutputStream(Message m) {
 122  0
             outMessage = m;
 123  0
         }
 124  
 
 125  
         protected void doFlush() throws IOException {
 126  
             // do nothing here
 127  0
         }
 128  
 
 129  
         protected void doClose() throws IOException {
 130  0
             isOneWay = outMessage.getExchange().isOneWay();
 131  0
             commitOutputMessage();
 132  0
             if (!isOneWay) {
 133  0
                 handleResponse();
 134  
             }
 135  0
         }
 136  
 
 137  
         protected void onWrite() throws IOException {
 138  
 
 139  0
         }
 140  
 
 141  
         private void commitOutputMessage() {
 142  0
             base.template.send(targetCamelEndpointUri, new Processor() {
 143  0
                 public void process(org.apache.camel.Exchange reply) {
 144  0
                     Object request = null;
 145  0
                     if (isTextPayload()) {
 146  0
                         request = currentStream.toString();
 147  0
                     } else {
 148  0
                         request = ((ByteArrayOutputStream)currentStream).toByteArray();
 149  
                     }
 150  
 
 151  0
                     getLogger().log(Level.FINE, "Conduit Request is :[" + request + "]");
 152  0
                     String replyTo = base.getReplyDestination();
 153  
                     // TODO setting up the responseExpected
 154  0
                     base.marshal(request, replyTo, reply);
 155  0
                     base.setMessageProperties(outMessage, reply);
 156  
 
 157  0
                     String correlationID = null;
 158  0
                     if (!isOneWay) {
 159  
                         // TODO create a correlationID
 160  0
                         String id = null;
 161  
 
 162  0
                         if (id != null) {
 163  0
                             if (correlationID != null) {
 164  0
                                 String error = "User cannot set CamelCorrelationID when " + "making a request/reply invocation using " + "a static replyTo Queue.";
 165  
                             }
 166  0
                             correlationID = id;
 167  
                         }
 168  
                     }
 169  
 
 170  0
                     if (correlationID != null) {
 171  0
                         reply.getIn().setHeader(CamelConstants.CAMEL_CORRELATION_ID, correlationID);
 172  
                     } else {
 173  
                         // No message correlation id is set. Whatever comeback
 174  
                         // will be accepted as responses.
 175  
                         // We assume that it will only happen in case of the
 176  
                         // temp. reply queue.
 177  
                     }
 178  
 
 179  0
                     getLogger().log(Level.FINE, "template sending request: ", reply.getIn());
 180  0
                 }
 181  
             });
 182  0
         }
 183  
 
 184  
         private void handleResponse() throws IOException {
 185  
             // REVISIT distinguish decoupled case or oneway call
 186  0
             Object response = null;
 187  
 
 188  
             // TODO if outMessage need to get the response
 189  0
             Message inMessage = new MessageImpl();
 190  0
             outMessage.getExchange().setInMessage(inMessage);
 191  
             // set the message header back to the incomeMessage
 192  
             // inMessage.put(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS,
 193  
             // outMessage.get(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS));
 194  
 
 195  
             /*
 196  
              * Object result1; Object result = null; javax.camel.Message
 197  
              * camelMessage1 = pooledSession.consumer().receive(timeout);
 198  
              * getLogger().log(Level.FINE, "template received reply: " ,
 199  
              * camelMessage1); if (camelMessage1 != null) {
 200  
              * base.populateIncomingContext(camelMessage1, outMessage,
 201  
              * CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS); String messageType =
 202  
              * camelMessage1 instanceof TextMessage ?
 203  
              * CamelConstants.TEXT_MESSAGE_TYPE :
 204  
              * CamelConstants.BINARY_MESSAGE_TYPE; result =
 205  
              * base.unmarshal((org.apache.camel.Exchange) outMessage); result1 =
 206  
              * result; } else { String error = "CamelClientTransport.receive()
 207  
              * timed out. No message available."; getLogger().log(Level.SEVERE,
 208  
              * error); //TODO: Review what exception should we throw. throw new
 209  
              * CamelException(error); } response = result1; //set the message
 210  
              * header back to the incomeMessage
 211  
              * inMessage.put(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS,
 212  
              * outMessage.get(CamelConstants.CAMEL_CLIENT_RESPONSE_HEADERS));
 213  
              */
 214  
 
 215  0
             getLogger().log(Level.FINE, "The Response Message is : [" + response + "]");
 216  
 
 217  
             // setup the inMessage response stream
 218  0
             byte[] bytes = null;
 219  0
             if (response instanceof String) {
 220  0
                 String requestString = (String)response;
 221  0
                 bytes = requestString.getBytes();
 222  0
             } else {
 223  0
                 bytes = (byte[])response;
 224  
             }
 225  0
             inMessage.setContent(InputStream.class, new ByteArrayInputStream(bytes));
 226  0
             getLogger().log(Level.FINE, "incoming observer is " + incomingObserver);
 227  0
             incomingObserver.onMessage(inMessage);
 228  0
         }
 229  
     }
 230  
 
 231  
     private boolean isTextPayload() {
 232  
         // TODO use runtime policy
 233  0
         return true;
 234  
     }
 235  
 
 236  
     /**
 237  
      * Represented decoupled response endpoint.
 238  
      */
 239  
     protected class DecoupledDestination implements Destination {
 240  
         protected MessageObserver decoupledMessageObserver;
 241  
         private EndpointReferenceType address;
 242  
 
 243  0
         DecoupledDestination(EndpointReferenceType ref, MessageObserver incomingObserver) {
 244  0
             address = ref;
 245  0
             decoupledMessageObserver = incomingObserver;
 246  0
         }
 247  
 
 248  
         public EndpointReferenceType getAddress() {
 249  0
             return address;
 250  
         }
 251  
 
 252  
         public Conduit getBackChannel(Message inMessage, Message partialResponse, EndpointReferenceType addr) throws IOException {
 253  
             // shouldn't be called on decoupled endpoint
 254  0
             return null;
 255  
         }
 256  
 
 257  
         public void shutdown() {
 258  
             // TODO Auto-generated method stub
 259  0
         }
 260  
 
 261  
         public synchronized void setMessageObserver(MessageObserver observer) {
 262  0
             decoupledMessageObserver = observer;
 263  0
         }
 264  
 
 265  
         public synchronized MessageObserver getMessageObserver() {
 266  0
             return decoupledMessageObserver;
 267  
         }
 268  
     }
 269  
 
 270  
 }