001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.bam; 018 019 import org.apache.camel.Exchange; 020 import org.apache.camel.Expression; 021 import org.apache.camel.Processor; 022 import org.apache.camel.RuntimeCamelException; 023 import org.apache.commons.logging.Log; 024 import org.apache.commons.logging.LogFactory; 025 import org.springframework.transaction.support.TransactionTemplate; 026 import org.springframework.transaction.support.TransactionCallbackWithoutResult; 027 import org.springframework.transaction.support.TransactionCallback; 028 import org.springframework.transaction.TransactionStatus; 029 import org.springframework.transaction.TransactionException; 030 031 import java.lang.reflect.Type; 032 import java.lang.reflect.ParameterizedType; 033 034 /** 035 * A {@link Processor} for working on 036 * <a href="http://activemq.apache.org/camel/bam.html">BAM</a> 037 * 038 * @version $Revision: $ 039 */ 040 public abstract class BamProcessorSupport<T> implements Processor { 041 private static final transient Log log = LogFactory.getLog(BamProcessorSupport.class); 042 043 private Class<T> entityType; 044 private Expression<Exchange> correlationKeyExpression; 045 private TransactionTemplate transactionTemplate; 046 047 048 protected BamProcessorSupport(TransactionTemplate transactionTemplate, Expression<Exchange> correlationKeyExpression) { 049 this.transactionTemplate = transactionTemplate; 050 this.correlationKeyExpression = correlationKeyExpression; 051 052 Type type = getClass().getGenericSuperclass(); 053 if (type instanceof ParameterizedType) { 054 ParameterizedType parameterizedType = (ParameterizedType) type; 055 Type[] arguments = parameterizedType.getActualTypeArguments(); 056 if (arguments.length > 0) { 057 Type argumentType = arguments[0]; 058 if (argumentType instanceof Class) { 059 this.entityType = (Class<T>) argumentType; 060 } 061 } 062 } 063 if (entityType == null) { 064 throw new IllegalArgumentException("Could not infer the entity type!"); 065 } 066 } 067 068 protected BamProcessorSupport(TransactionTemplate transactionTemplate, Expression<Exchange> correlationKeyExpression, Class<T> entitytype) { 069 this.transactionTemplate = transactionTemplate; 070 this.entityType = entitytype; 071 this.correlationKeyExpression = correlationKeyExpression; 072 } 073 074 public void process(final Exchange exchange) { 075 try { 076 Object entity = transactionTemplate.execute(new TransactionCallback() { 077 public Object doInTransaction(TransactionStatus status) { 078 try { 079 Object key = getCorrelationKey(exchange); 080 081 T entity = loadEntity(exchange, key); 082 083 log.info("Correlation key: " + key + " with entity: " + entity); 084 085 //storeProcessInExchange(exchange, entity); 086 processEntity(exchange, entity); 087 088 return entity; 089 } 090 catch (Exception e) { 091 throw new RuntimeCamelException(e); 092 } 093 }}); 094 095 log.info("After transaction process instance is: " + entity); 096 } 097 catch (Throwable e) { 098 log.error("Caught: " + e, e); 099 } 100 } 101 102 // Properties 103 //----------------------------------------------------------------------- 104 public Expression<Exchange> getCorrelationKeyExpression() { 105 return correlationKeyExpression; 106 } 107 108 109 public Class<T> getEntityType() { 110 return entityType; 111 } 112 113 // Implemenation methods 114 //----------------------------------------------------------------------- 115 protected abstract void processEntity(Exchange exchange, T entity) throws Exception; 116 117 protected abstract T loadEntity(Exchange exchange, Object key); 118 119 120 protected Object getCorrelationKey(Exchange exchange) throws NoCorrelationKeyException { 121 Object value = correlationKeyExpression.evaluate(exchange); 122 if (value == null) { 123 throw new NoCorrelationKeyException(this, exchange); 124 } 125 return value; 126 } 127 }