001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.bam.processor;
018    
019    import org.apache.camel.Exchange;
020    import org.apache.camel.Expression;
021    import org.apache.camel.Processor;
022    import org.apache.camel.RuntimeCamelException;
023    import org.apache.commons.logging.Log;
024    import org.apache.commons.logging.LogFactory;
025    import org.springframework.dao.DataIntegrityViolationException;
026    import org.springframework.orm.jpa.JpaSystemException;
027    import org.springframework.transaction.TransactionStatus;
028    import org.springframework.transaction.support.TransactionCallback;
029    import org.springframework.transaction.support.TransactionTemplate;
030    
031    import javax.persistence.EntityExistsException;
032    import java.lang.reflect.ParameterizedType;
033    import java.lang.reflect.Type;
034    
035    /**
036     * A base {@link Processor} for working on <a
037     * href="http://activemq.apache.org/camel/bam.html">BAM</a> which a derived
038     * class would do the actual persistence such as the {@link JpaBamProcessor}
039     *
040     * @version $Revision: $
041     */
042    public abstract class BamProcessorSupport<T> implements Processor {
043        private static final transient Log LOG = LogFactory.getLog(BamProcessorSupport.class);
044        private Class<T> entityType;
045        private Expression<Exchange> correlationKeyExpression;
046        private TransactionTemplate transactionTemplate;
047        private int maximumRetries = 30;
048    
049        public int getMaximumRetries() {
050            return maximumRetries;
051        }
052    
053        public void setMaximumRetries(int maximumRetries) {
054            this.maximumRetries = maximumRetries;
055        }
056    
057        protected BamProcessorSupport(TransactionTemplate transactionTemplate, Expression<Exchange> correlationKeyExpression) {
058            this.transactionTemplate = transactionTemplate;
059            this.correlationKeyExpression = correlationKeyExpression;
060    
061            Type type = getClass().getGenericSuperclass();
062            if (type instanceof ParameterizedType) {
063                ParameterizedType parameterizedType = (ParameterizedType) type;
064                Type[] arguments = parameterizedType.getActualTypeArguments();
065                if (arguments.length > 0) {
066                    Type argumentType = arguments[0];
067                    if (argumentType instanceof Class) {
068                        this.entityType = (Class<T>) argumentType;
069                    }
070                }
071            }
072            if (entityType == null) {
073                throw new IllegalArgumentException("Could not infer the entity type!");
074            }
075        }
076    
077        protected BamProcessorSupport(TransactionTemplate transactionTemplate, Expression<Exchange> correlationKeyExpression, Class<T> entitytype) {
078            this.transactionTemplate = transactionTemplate;
079            this.entityType = entitytype;
080            this.correlationKeyExpression = correlationKeyExpression;
081        }
082    
083        public void process(final Exchange exchange) {
084            Object entity = null;
085            for (int i = 0; entity == null && i < maximumRetries; i++) {
086                if (i > 0) {
087                    LOG.info("Retry attempt due to duplicate row: " + i);
088                }
089                entity = transactionTemplate.execute(new TransactionCallback() {
090                    public Object doInTransaction(TransactionStatus status) {
091                        try {
092                            Object key = getCorrelationKey(exchange);
093    
094                            T entity = loadEntity(exchange, key);
095    
096                            if (LOG.isDebugEnabled()) {
097                                LOG.debug("Correlation key: " + key + " with entity: " + entity);
098                            }
099                            processEntity(exchange, entity);
100    
101                            return entity;
102                        }
103                        catch (JpaSystemException e) {
104                            if (LOG.isDebugEnabled()) {
105                                LOG.debug("Likely exception is due to duplicate row in concurrent setting: " + e, e);
106                            }
107                            LOG.info("Attempt to insert duplicate row due to concurrency issue, so retrying: " + e);
108                            return retryDueToDuplicate(status);
109                        }
110                        catch (DataIntegrityViolationException e) {
111                            Throwable throwable = e.getCause();
112                            if (throwable instanceof EntityExistsException) {
113                                LOG.info("Attempt to insert duplicate row due to concurrency issue, so retrying: " + throwable);
114                                return retryDueToDuplicate(status);
115                            }
116                            return onError(status, throwable);
117                        }
118                        catch (Throwable e) {
119                            return onError(status, e);
120                        }
121                    }
122                });
123            }
124        }
125    
126        // Properties
127        // -----------------------------------------------------------------------
128        public Expression<Exchange> getCorrelationKeyExpression() {
129            return correlationKeyExpression;
130        }
131    
132        public Class<T> getEntityType() {
133            return entityType;
134        }
135    
136        // Implemenation methods
137        // -----------------------------------------------------------------------
138        protected abstract void processEntity(Exchange exchange, T entity) throws Exception;
139    
140        protected abstract T loadEntity(Exchange exchange, Object key);
141    
142        protected Object getCorrelationKey(Exchange exchange) throws NoCorrelationKeyException {
143            Object value = correlationKeyExpression.evaluate(exchange);
144            if (value == null) {
145                throw new NoCorrelationKeyException(this, exchange);
146            }
147            return value;
148        }
149    
150        protected Object retryDueToDuplicate(TransactionStatus status) {
151            status.setRollbackOnly();
152            return null;
153        }
154    
155        protected Object onError(TransactionStatus status, Throwable e) {
156            status.setRollbackOnly();
157            LOG.error("Caught: " + e, e);
158            throw new RuntimeCamelException(e);
159        }
160    }