Coverage Report - org.apache.camel.bam.processor.JpaBamProcessorSupport
 
Classes in this File Line Coverage Branch Coverage Complexity
JpaBamProcessorSupport
48% 
40% 
0
 
 1  
 /**
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  *
 9  
  *      http://www.apache.org/licenses/LICENSE-2.0
 10  
  *
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 package org.apache.camel.bam.processor;
 18  
 
 19  
 import org.apache.camel.Exchange;
 20  
 import org.apache.camel.Expression;
 21  
 import org.apache.camel.Processor;
 22  
 import org.apache.camel.bam.model.ProcessDefinition;
 23  
 import org.apache.camel.bam.rules.ActivityRules;
 24  
 import org.apache.camel.util.IntrospectionSupport;
 25  
 import org.apache.commons.logging.Log;
 26  
 import org.apache.commons.logging.LogFactory;
 27  
 import org.springframework.orm.jpa.JpaTemplate;
 28  
 import org.springframework.transaction.support.TransactionTemplate;
 29  
 
 30  
 import java.util.List;
 31  
 
 32  
 /**
 33  
  * A base class for JPA based BAM which can use any entity to store the process
 34  
  * instance information which allows derived classes to specialise the process
 35  
  * instance entity.
 36  
  *
 37  
  * @version $Revision: $
 38  
  */
 39  
 public class JpaBamProcessorSupport<T> extends BamProcessorSupport<T> {
 40  2
     private static final transient Log LOG = LogFactory.getLog(JpaBamProcessorSupport.class);
 41  
     
 42  
     private ActivityRules activityRules;
 43  
     private JpaTemplate template;
 44  
     private String findByKeyQuery;
 45  4
     private String keyPropertyName = "correlationKey";
 46  4
     private boolean correlationKeyIsPrimary = true;
 47  
 
 48  
     public JpaBamProcessorSupport(TransactionTemplate transactionTemplate, JpaTemplate template, Expression<Exchange> correlationKeyExpression, ActivityRules activityRules, Class<T> entitytype) {
 49  4
         super(transactionTemplate, correlationKeyExpression, entitytype);
 50  4
         this.activityRules = activityRules;
 51  4
         this.template = template;
 52  4
     }
 53  
 
 54  
     public JpaBamProcessorSupport(TransactionTemplate transactionTemplate, JpaTemplate template, Expression<Exchange> correlationKeyExpression, ActivityRules activityRules) {
 55  0
         super(transactionTemplate, correlationKeyExpression);
 56  0
         this.activityRules = activityRules;
 57  0
         this.template = template;
 58  0
     }
 59  
 
 60  
     public String getFindByKeyQuery() {
 61  0
         if (findByKeyQuery == null) {
 62  0
             findByKeyQuery = createFindByKeyQuery();
 63  
         }
 64  0
         return findByKeyQuery;
 65  
     }
 66  
 
 67  
     public void setFindByKeyQuery(String findByKeyQuery) {
 68  0
         this.findByKeyQuery = findByKeyQuery;
 69  0
     }
 70  
 
 71  
     public ActivityRules getActivityRules() {
 72  10
         return activityRules;
 73  
     }
 74  
 
 75  
     public void setActivityRules(ActivityRules activityRules) {
 76  0
         this.activityRules = activityRules;
 77  0
     }
 78  
 
 79  
     public String getKeyPropertyName() {
 80  4
         return keyPropertyName;
 81  
     }
 82  
 
 83  
     public void setKeyPropertyName(String keyPropertyName) {
 84  0
         this.keyPropertyName = keyPropertyName;
 85  0
     }
 86  
 
 87  
     public JpaTemplate getTemplate() {
 88  0
         return template;
 89  
     }
 90  
 
 91  
     public void setTemplate(JpaTemplate template) {
 92  0
         this.template = template;
 93  0
     }
 94  
 
 95  
     public boolean isCorrelationKeyIsPrimary() {
 96  6
         return correlationKeyIsPrimary;
 97  
     }
 98  
 
 99  
     public void setCorrelationKeyIsPrimary(boolean correlationKeyIsPrimary) {
 100  0
         this.correlationKeyIsPrimary = correlationKeyIsPrimary;
 101  0
     }
 102  
 
 103  
     // Implementatiom methods
 104  
     // -----------------------------------------------------------------------
 105  
     protected T loadEntity(Exchange exchange, Object key) {
 106  6
         T entity = findEntityByCorrelationKey(key);
 107  6
         if (entity == null) {
 108  4
             entity = createEntity(exchange, key);
 109  4
             setKeyProperty(entity, key);
 110  4
             ProcessDefinition definition = ProcessDefinition.getRefreshedProcessDefinition(template, getActivityRules().getProcessRules().getProcessDefinition());
 111  4
             setProcessDefinitionProperty(entity, definition);
 112  4
             template.persist(entity);
 113  
 
 114  
             // Now we must flush to avoid concurrent updates clashing trying to insert the
 115  
             // same row
 116  4
             LOG.debug("About to flush on entity: " + entity + " with key: " + key);
 117  4
             template.flush();
 118  
         }
 119  6
         return entity;
 120  
     }
 121  
 
 122  
     protected T findEntityByCorrelationKey(Object key) {
 123  6
         if (isCorrelationKeyIsPrimary()) {
 124  6
             return template.find(getEntityType(), key);
 125  
         }
 126  
         else {
 127  0
             List<T> list = template.find(getFindByKeyQuery(), key);
 128  0
             if (list.isEmpty()) {
 129  0
                 return null;
 130  
             }
 131  
             else {
 132  0
                 return list.get(0);
 133  
             }
 134  
         }
 135  
     }
 136  
 
 137  
     /**
 138  
      * Sets the key property on the new entity
 139  
      */
 140  
     protected void setKeyProperty(T entity, Object key) {
 141  4
         IntrospectionSupport.setProperty(entity, getKeyPropertyName(), key);
 142  4
     }
 143  
 
 144  
     protected void setProcessDefinitionProperty(T entity, ProcessDefinition processDefinition) {
 145  4
         IntrospectionSupport.setProperty(entity, "processDefinition", processDefinition);
 146  4
     }
 147  
 
 148  
     /**
 149  
      * Create a new instance of the entity for the given key
 150  
      */
 151  
     protected T createEntity(Exchange exchange, Object key) {
 152  4
         return (T) exchange.getContext().getInjector().newInstance(getEntityType());
 153  
     }
 154  
 
 155  
     protected void processEntity(Exchange exchange, T entity) throws Exception {
 156  0
         if (entity instanceof Processor) {
 157  0
             Processor processor = (Processor) entity;
 158  0
             processor.process(exchange);
 159  0
         }
 160  
         else {
 161  
             // TODO add other extension points - eg. passing in Activity
 162  0
             throw new IllegalArgumentException("No processor defined for this route");
 163  
         }
 164  0
     }
 165  
 
 166  
     protected String createFindByKeyQuery() {
 167  0
         return "select x from " + getEntityType().getName() + " x where x." + getKeyPropertyName() + " = ?1";
 168  
     }
 169  
 }