001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.bam.processor;
018    
019    import org.apache.camel.Exchange;
020    import org.apache.camel.Expression;
021    import org.apache.camel.Processor;
022    import org.apache.camel.bam.model.ProcessDefinition;
023    import org.apache.camel.bam.rules.ActivityRules;
024    import org.apache.camel.util.IntrospectionSupport;
025    import org.apache.commons.logging.Log;
026    import org.apache.commons.logging.LogFactory;
027    import org.springframework.orm.jpa.JpaTemplate;
028    import org.springframework.transaction.support.TransactionTemplate;
029    
030    import java.util.List;
031    
032    /**
033     * A base class for JPA based BAM which can use any entity to store the process
034     * instance information which allows derived classes to specialise the process
035     * instance entity.
036     *
037     * @version $Revision: $
038     */
039    public class JpaBamProcessorSupport<T> extends BamProcessorSupport<T> {
040        private static final transient Log LOG = LogFactory.getLog(JpaBamProcessorSupport.class);
041        
042        private ActivityRules activityRules;
043        private JpaTemplate template;
044        private String findByKeyQuery;
045        private String keyPropertyName = "correlationKey";
046        private boolean correlationKeyIsPrimary = true;
047    
048        public JpaBamProcessorSupport(TransactionTemplate transactionTemplate, JpaTemplate template, Expression<Exchange> correlationKeyExpression, ActivityRules activityRules, Class<T> entitytype) {
049            super(transactionTemplate, correlationKeyExpression, entitytype);
050            this.activityRules = activityRules;
051            this.template = template;
052        }
053    
054        public JpaBamProcessorSupport(TransactionTemplate transactionTemplate, JpaTemplate template, Expression<Exchange> correlationKeyExpression, ActivityRules activityRules) {
055            super(transactionTemplate, correlationKeyExpression);
056            this.activityRules = activityRules;
057            this.template = template;
058        }
059    
060        public String getFindByKeyQuery() {
061            if (findByKeyQuery == null) {
062                findByKeyQuery = createFindByKeyQuery();
063            }
064            return findByKeyQuery;
065        }
066    
067        public void setFindByKeyQuery(String findByKeyQuery) {
068            this.findByKeyQuery = findByKeyQuery;
069        }
070    
071        public ActivityRules getActivityRules() {
072            return activityRules;
073        }
074    
075        public void setActivityRules(ActivityRules activityRules) {
076            this.activityRules = activityRules;
077        }
078    
079        public String getKeyPropertyName() {
080            return keyPropertyName;
081        }
082    
083        public void setKeyPropertyName(String keyPropertyName) {
084            this.keyPropertyName = keyPropertyName;
085        }
086    
087        public JpaTemplate getTemplate() {
088            return template;
089        }
090    
091        public void setTemplate(JpaTemplate template) {
092            this.template = template;
093        }
094    
095        public boolean isCorrelationKeyIsPrimary() {
096            return correlationKeyIsPrimary;
097        }
098    
099        public void setCorrelationKeyIsPrimary(boolean correlationKeyIsPrimary) {
100            this.correlationKeyIsPrimary = correlationKeyIsPrimary;
101        }
102    
103        // Implementatiom methods
104        // -----------------------------------------------------------------------
105        protected T loadEntity(Exchange exchange, Object key) {
106            T entity = findEntityByCorrelationKey(key);
107            if (entity == null) {
108                entity = createEntity(exchange, key);
109                setKeyProperty(entity, key);
110                ProcessDefinition definition = ProcessDefinition.getRefreshedProcessDefinition(template, getActivityRules().getProcessRules().getProcessDefinition());
111                setProcessDefinitionProperty(entity, definition);
112                template.persist(entity);
113    
114                // Now we must flush to avoid concurrent updates clashing trying to insert the
115                // same row
116                LOG.debug("About to flush on entity: " + entity + " with key: " + key);
117                template.flush();
118            }
119            return entity;
120        }
121    
122        protected T findEntityByCorrelationKey(Object key) {
123            if (isCorrelationKeyIsPrimary()) {
124                return template.find(getEntityType(), key);
125            }
126            else {
127                List<T> list = template.find(getFindByKeyQuery(), key);
128                if (list.isEmpty()) {
129                    return null;
130                }
131                else {
132                    return list.get(0);
133                }
134            }
135        }
136    
137        /**
138         * Sets the key property on the new entity
139         */
140        protected void setKeyProperty(T entity, Object key) {
141            IntrospectionSupport.setProperty(entity, getKeyPropertyName(), key);
142        }
143    
144        protected void setProcessDefinitionProperty(T entity, ProcessDefinition processDefinition) {
145            IntrospectionSupport.setProperty(entity, "processDefinition", processDefinition);
146        }
147    
148        /**
149         * Create a new instance of the entity for the given key
150         */
151        protected T createEntity(Exchange exchange, Object key) {
152            return (T) exchange.getContext().getInjector().newInstance(getEntityType());
153        }
154    
155        protected void processEntity(Exchange exchange, T entity) throws Exception {
156            if (entity instanceof Processor) {
157                Processor processor = (Processor) entity;
158                processor.process(exchange);
159            }
160            else {
161                // TODO add other extension points - eg. passing in Activity
162                throw new IllegalArgumentException("No processor defined for this route");
163            }
164        }
165    
166        protected String createFindByKeyQuery() {
167            return "select x from " + getEntityType().getName() + " x where x." + getKeyPropertyName() + " = ?1";
168        }
169    }