001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.ArrayList; 020 import java.util.Iterator; 021 import java.util.List; 022 import java.util.Map; 023 import java.util.concurrent.ConcurrentHashMap; 024 025 import org.apache.camel.CamelContext; 026 import org.apache.camel.Endpoint; 027 import org.apache.camel.Exchange; 028 import org.apache.camel.ExchangePattern; 029 import org.apache.camel.Message; 030 import org.apache.camel.spi.Synchronization; 031 import org.apache.camel.spi.UnitOfWork; 032 import org.apache.camel.util.ExchangeHelper; 033 import org.apache.camel.util.ObjectHelper; 034 import org.apache.camel.util.UuidGenerator; 035 036 /** 037 * A default implementation of {@link Exchange} 038 * 039 * @version $Revision: 792977 $ 040 */ 041 public final class DefaultExchange implements Exchange { 042 043 private static final UuidGenerator DEFAULT_ID_GENERATOR = new UuidGenerator(); 044 protected final CamelContext context; 045 private Map<String, Object> properties; 046 private Message in; 047 private Message out; 048 private Message fault; 049 private Exception exception; 050 private String exchangeId; 051 private UnitOfWork unitOfWork; 052 private ExchangePattern pattern; 053 private Endpoint fromEndpoint; 054 private List<Synchronization> onCompletions; 055 056 public DefaultExchange(CamelContext context) { 057 this(context, ExchangePattern.InOnly); 058 } 059 060 public DefaultExchange(CamelContext context, ExchangePattern pattern) { 061 this.context = context; 062 this.pattern = pattern; 063 } 064 065 public DefaultExchange(Exchange parent) { 066 this(parent.getContext(), parent.getPattern()); 067 this.unitOfWork = parent.getUnitOfWork(); 068 this.fromEndpoint = parent.getFromEndpoint(); 069 } 070 071 public DefaultExchange(Endpoint fromEndpoint) { 072 this(fromEndpoint, ExchangePattern.InOnly); 073 } 074 075 public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) { 076 this.context = fromEndpoint.getCamelContext(); 077 this.fromEndpoint = fromEndpoint; 078 this.pattern = pattern; 079 } 080 081 @Override 082 public String toString() { 083 return "Exchange[" + in + "]"; 084 } 085 086 public Exchange copy() { 087 Exchange exchange = newInstance(); 088 exchange.copyFrom(this); 089 return exchange; 090 } 091 092 public Exchange copy(boolean handoverOnCompletion) { 093 Exchange copy = copy(); 094 // do not share the unit of work 095 copy.setUnitOfWork(null); 096 // handover on completeion to the copy if we got any 097 if (handoverOnCompletion && unitOfWork != null) { 098 unitOfWork.handoverSynchronization(copy); 099 } 100 // set a correlation id so we can track back the original exchange 101 copy.setProperty(Exchange.CORRELATION_ID, this.getExchangeId()); 102 return copy; 103 } 104 105 public void copyFrom(Exchange exchange) { 106 if (exchange == this) { 107 return; 108 } 109 setProperties(safeCopy(exchange.getProperties())); 110 111 // this can cause strangeness if we copy, say, a FileMessage onto an FtpExchange with overloaded getExchange() methods etc. 112 safeCopy(getIn(), exchange.getIn()); 113 if (exchange.hasOut()) { 114 safeCopy(getOut(), exchange.getOut()); 115 } 116 if (exchange.hasFault()) { 117 safeCopy(getFault(), exchange.getFault()); 118 } 119 setException(exchange.getException()); 120 121 unitOfWork = exchange.getUnitOfWork(); 122 pattern = exchange.getPattern(); 123 setFromEndpoint(exchange.getFromEndpoint()); 124 } 125 126 private static void safeCopy(Message message, Message that) { 127 if (message != null) { 128 message.copyFrom(that); 129 } 130 } 131 132 private static Map<String, Object> safeCopy(Map<String, Object> properties) { 133 if (properties == null) { 134 return null; 135 } 136 return new ConcurrentHashMap<String, Object>(properties); 137 } 138 139 public Exchange newInstance() { 140 return new DefaultExchange(this); 141 } 142 143 public CamelContext getContext() { 144 return context; 145 } 146 147 public Object getProperty(String name) { 148 if (properties != null) { 149 return properties.get(name); 150 } 151 return null; 152 } 153 154 public <T> T getProperty(String name, Class<T> type) { 155 Object value = getProperty(name); 156 157 // eager same instance type test to avoid the overhead of invoking the type converter 158 // if already same type 159 if (type.isInstance(value)) { 160 return type.cast(value); 161 } 162 163 return ExchangeHelper.convertToType(this, type, value); 164 } 165 166 public void setProperty(String name, Object value) { 167 if (value != null) { 168 // avoid the NullPointException 169 getProperties().put(name, value); 170 } else { 171 // if the value is null, we just remove the key from the map 172 if (name != null) { 173 getProperties().remove(name); 174 } 175 } 176 } 177 178 public Object removeProperty(String name) { 179 return getProperties().remove(name); 180 } 181 182 public Map<String, Object> getProperties() { 183 if (properties == null) { 184 properties = new ConcurrentHashMap<String, Object>(); 185 } 186 return properties; 187 } 188 189 public void setProperties(Map<String, Object> properties) { 190 this.properties = properties; 191 } 192 193 public Message getIn() { 194 if (in == null) { 195 in = new DefaultMessage(); 196 configureMessage(in); 197 } 198 return in; 199 } 200 201 public void setIn(Message in) { 202 this.in = in; 203 configureMessage(in); 204 } 205 206 public Message getOut() { 207 return getOut(true); 208 } 209 210 public boolean hasOut() { 211 return out != null; 212 } 213 214 public Message getOut(boolean lazyCreate) { 215 if (out == null && lazyCreate) { 216 out = (in != null && in instanceof MessageSupport) 217 ? ((MessageSupport)in).newInstance() : new DefaultMessage(); 218 configureMessage(out); 219 } 220 return out; 221 } 222 223 public void setOut(Message out) { 224 this.out = out; 225 configureMessage(out); 226 } 227 228 public Exception getException() { 229 return exception; 230 } 231 232 public <T> T getException(Class<T> type) { 233 if (exception == null) { 234 return null; 235 } 236 237 Iterator<Throwable> it = ObjectHelper.createExceptionIterator(exception); 238 while (it.hasNext()) { 239 Throwable e = it.next(); 240 if (type.isInstance(e)) { 241 return type.cast(e); 242 } 243 } 244 // not found 245 return null; 246 } 247 248 public void setException(Exception exception) { 249 this.exception = exception; 250 } 251 252 public ExchangePattern getPattern() { 253 return pattern; 254 } 255 256 public void setPattern(ExchangePattern pattern) { 257 this.pattern = pattern; 258 } 259 260 public Endpoint getFromEndpoint() { 261 return fromEndpoint; 262 } 263 264 public void setFromEndpoint(Endpoint fromEndpoint) { 265 this.fromEndpoint = fromEndpoint; 266 } 267 268 public boolean hasFault() { 269 return fault != null; 270 } 271 272 public Message getFault() { 273 return getFault(true); 274 } 275 276 public Message getFault(boolean lazyCreate) { 277 if (fault == null && lazyCreate) { 278 fault = (in != null && in instanceof MessageSupport) 279 ? ((MessageSupport)in).newInstance() : new DefaultMessage(); 280 configureMessage(fault); 281 } 282 return fault; 283 } 284 285 public void setFault(Message fault) { 286 this.fault = fault; 287 configureMessage(fault); 288 } 289 290 public String getExchangeId() { 291 if (exchangeId == null) { 292 exchangeId = createExchangeId(); 293 } 294 return exchangeId; 295 } 296 297 public void setExchangeId(String id) { 298 this.exchangeId = id; 299 } 300 301 public boolean isFailed() { 302 if (hasFault()) { 303 Object faultBody = getFault().getBody(); 304 if (faultBody != null) { 305 return true; 306 } 307 } 308 return getException() != null; 309 } 310 311 public boolean isTransacted() { 312 Boolean transacted = getProperty(TRANSACTED, Boolean.class); 313 return transacted != null && transacted; 314 } 315 316 public boolean isRollbackOnly() { 317 Boolean rollback = getProperty(ROLLBACK_ONLY, Boolean.class); 318 return rollback != null && rollback; 319 } 320 321 public UnitOfWork getUnitOfWork() { 322 return unitOfWork; 323 } 324 325 public void setUnitOfWork(UnitOfWork unitOfWork) { 326 this.unitOfWork = unitOfWork; 327 if (this.onCompletions != null) { 328 // now an unit of work has been assigned so add the on completions 329 // we might have registered already 330 for (Synchronization onCompletion : this.onCompletions) { 331 this.unitOfWork.addSynchronization(onCompletion); 332 } 333 // cleanup the temporary on completion list as they now have been registered 334 // on the unit of work 335 this.onCompletions.clear(); 336 this.onCompletions = null; 337 } 338 } 339 340 public void addOnCompletion(Synchronization onCompletion) { 341 if (this.unitOfWork == null) { 342 // unit of work not yet registered so we store the on completion temporary 343 // until the unit of work is assigned to this exchange by the UnitOfWorkProcessor 344 if (this.onCompletions == null) { 345 this.onCompletions = new ArrayList<Synchronization>(); 346 } 347 this.onCompletions.add(onCompletion); 348 } else { 349 this.getUnitOfWork().addSynchronization(onCompletion); 350 } 351 } 352 353 /** 354 * Configures the message after it has been set on the exchange 355 */ 356 protected void configureMessage(Message message) { 357 if (message instanceof MessageSupport) { 358 MessageSupport messageSupport = (MessageSupport)message; 359 messageSupport.setExchange(this); 360 } 361 } 362 363 protected String createExchangeId() { 364 String answer = null; 365 if (in != null) { 366 answer = in.createExchangeId(); 367 } 368 if (answer == null) { 369 answer = DefaultExchange.DEFAULT_ID_GENERATOR.generateId(); 370 } 371 return answer; 372 } 373 }