Coverage Report - org.apache.camel.component.jpa.JpaConsumer
 
Classes in this File Line Coverage Branch Coverage Complexity
JpaConsumer
69% 
94% 
0
 
 1  
 /**
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  *
 9  
  *      http://www.apache.org/licenses/LICENSE-2.0
 10  
  *
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 package org.apache.camel.component.jpa;
 18  
 
 19  
 import java.lang.reflect.Method;
 20  
 import java.util.List;
 21  
 
 22  
 import javax.persistence.EntityManager;
 23  
 import javax.persistence.LockModeType;
 24  
 import javax.persistence.PersistenceException;
 25  
 import javax.persistence.Query;
 26  
 
 27  
 import org.apache.camel.Exchange;
 28  
 import org.apache.camel.Processor;
 29  
 import org.apache.camel.impl.ScheduledPollConsumer;
 30  
 import org.apache.camel.util.ObjectHelper;
 31  
 import org.apache.commons.logging.Log;
 32  
 import org.apache.commons.logging.LogFactory;
 33  
 
 34  
 import org.springframework.orm.jpa.JpaCallback;
 35  
 
 36  
 /**
 37  
  * @version $Revision: 563665 $
 38  
  */
 39  3
 public class JpaConsumer extends ScheduledPollConsumer<Exchange> {
 40  1
     private static final transient Log LOG = LogFactory.getLog(JpaConsumer.class);
 41  
     private final JpaEndpoint endpoint;
 42  
     private final TransactionStrategy template;
 43  
     private QueryFactory queryFactory;
 44  
     private DeleteHandler<Object> deleteHandler;
 45  
     private String query;
 46  
     private String namedQuery;
 47  
     private String nativeQuery;
 48  
 
 49  
     public JpaConsumer(JpaEndpoint endpoint, Processor processor) {
 50  3
         super(endpoint, processor);
 51  3
         this.endpoint = endpoint;
 52  3
         this.template = endpoint.createTransactionStrategy();
 53  3
     }
 54  
 
 55  
     protected void poll() throws Exception {
 56  5
         template.execute(new JpaCallback() {
 57  5
             public Object doInJpa(EntityManager entityManager) throws PersistenceException {
 58  5
                 Query query = getQueryFactory().createQuery(entityManager);
 59  5
                 configureParameters(query);
 60  5
                 List results = query.getResultList();
 61  5
                 for (Object result : results) {
 62  3
                     if (LOG.isDebugEnabled()) {
 63  0
                         LOG.debug("Processing new entity: " + result);
 64  
                     }
 65  
 
 66  3
                     if (lockEntity(result, entityManager)) {
 67  
                         // lets turn the result into an exchange and fire it
 68  
                         // into the processor
 69  3
                         Exchange exchange = createExchange(result);
 70  
                         try {
 71  3
                             getProcessor().process(exchange);
 72  0
                         } catch (Exception e) {
 73  0
                             throw new PersistenceException(e);
 74  3
                         }
 75  3
                         getDeleteHandler().deleteObject(entityManager, result);
 76  
                     }
 77  3
                 }
 78  5
                 entityManager.flush();
 79  5
                 return null;
 80  
             }
 81  
         });
 82  5
     }
 83  
 
 84  
     // Properties
 85  
     // -------------------------------------------------------------------------
 86  
     public JpaEndpoint getEndpoint() {
 87  11
         return endpoint;
 88  
     }
 89  
 
 90  
     public QueryFactory getQueryFactory() {
 91  5
         if (queryFactory == null) {
 92  3
             queryFactory = createQueryFactory();
 93  3
             if (queryFactory == null) {
 94  0
                 throw new IllegalArgumentException("No queryType property configured on this consumer, nor an entityType configured on the endpoint so cannot consume");
 95  
             }
 96  
         }
 97  5
         return queryFactory;
 98  
     }
 99  
 
 100  
     public void setQueryFactory(QueryFactory queryFactory) {
 101  0
         this.queryFactory = queryFactory;
 102  0
     }
 103  
 
 104  
     public DeleteHandler getDeleteHandler() {
 105  3
         if (deleteHandler == null) {
 106  3
             deleteHandler = createDeleteHandler();
 107  
         }
 108  3
         return deleteHandler;
 109  
     }
 110  
 
 111  
     public void setDeleteHandler(DeleteHandler deleteHandler) {
 112  0
         this.deleteHandler = deleteHandler;
 113  0
     }
 114  
 
 115  
     public String getNamedQuery() {
 116  0
         return namedQuery;
 117  
     }
 118  
 
 119  
     public void setNamedQuery(String namedQuery) {
 120  1
         this.namedQuery = namedQuery;
 121  1
     }
 122  
 
 123  
     public String getNativeQuery() {
 124  0
         return nativeQuery;
 125  
     }
 126  
 
 127  
     public void setNativeQuery(String nativeQuery) {
 128  0
         this.nativeQuery = nativeQuery;
 129  0
     }
 130  
 
 131  
     public String getQuery() {
 132  0
         return query;
 133  
     }
 134  
 
 135  
     public void setQuery(String query) {
 136  0
         this.query = query;
 137  0
     }
 138  
 
 139  
     // Implementation methods
 140  
     // -------------------------------------------------------------------------
 141  
 
 142  
     /**
 143  
      * A strategy method to lock an object with an exclusive lock so that it can
 144  
      * be processed
 145  
      * 
 146  
      * @param entity the entity to be locked
 147  
      * @param entityManager
 148  
      * @return true if the entity was locked
 149  
      */
 150  
     protected boolean lockEntity(Object entity, EntityManager entityManager) {
 151  3
         if (!getEndpoint().isConsumeDelete() || !getEndpoint().isConsumeLockEntity()) {
 152  0
             return true;
 153  
         }
 154  
         try {
 155  3
             if (LOG.isDebugEnabled()) {
 156  0
                 LOG.debug("Acquiring exclusive lock on entity: " + entity);
 157  
             }
 158  3
             entityManager.lock(entity, LockModeType.WRITE);
 159  3
             return true;
 160  0
         } catch (Exception e) {
 161  0
             if (LOG.isDebugEnabled()) {
 162  0
                 LOG.debug("Failed to achieve lock on entity: " + entity + ". Reason: " + e, e);
 163  
             }
 164  0
             return false;
 165  
         }
 166  
     }
 167  
 
 168  
     protected QueryFactory createQueryFactory() {
 169  3
         if (query != null) {
 170  0
             return QueryBuilder.query(query);
 171  3
         } else if (namedQuery != null) {
 172  1
             return QueryBuilder.namedQuery(namedQuery);
 173  2
         } else if (nativeQuery != null) {
 174  0
             return QueryBuilder.nativeQuery(nativeQuery);
 175  
         } else {
 176  2
             Class<?> entityType = endpoint.getEntityType();
 177  2
             if (entityType == null) {
 178  0
                 return null;
 179  
             } else {
 180  2
                 return QueryBuilder.query("select x from " + entityType.getName() + " x");
 181  
             }
 182  
         }
 183  
     }
 184  
 
 185  
     protected DeleteHandler<Object> createDeleteHandler() {
 186  
         // TODO auto-discover an annotation in the entity bean to indicate the
 187  
         // process completed method call?
 188  3
         Class<?> entityType = getEndpoint().getEntityType();
 189  3
         if (entityType != null) {
 190  3
             List<Method> methods = ObjectHelper.findMethodsWithAnnotation(entityType, Consumed.class);
 191  3
             if (methods.size() > 1) {
 192  0
                 throw new IllegalArgumentException("Only one method can be annotated with the @Consumed annotation but found: " + methods);
 193  3
             } else if (methods.size() == 1) {
 194  1
                 final Method method = methods.get(0);
 195  
 
 196  1
                 return new DeleteHandler<Object>() {
 197  1
                     public void deleteObject(EntityManager entityManager, Object entityBean) {
 198  1
                         ObjectHelper.invokeMethod(method, entityBean);
 199  1
                     }
 200  
                 };
 201  
             }
 202  
         }
 203  2
         if (getEndpoint().isConsumeDelete()) {
 204  2
             return new DeleteHandler<Object>() {
 205  2
                 public void deleteObject(EntityManager entityManager, Object entityBean) {
 206  2
                     entityManager.remove(entityBean);
 207  2
                 }
 208  
             };
 209  
         } else {
 210  0
             return new DeleteHandler<Object>() {
 211  0
                 public void deleteObject(EntityManager entityManager, Object entityBean) {
 212  
                     // do nothing
 213  0
                 }
 214  
             };
 215  
         }
 216  
     }
 217  
 
 218  
     protected void configureParameters(Query query) {
 219  5
         int maxResults = endpoint.getMaximumResults();
 220  5
         if (maxResults > 0) {
 221  0
             query.setMaxResults(maxResults);
 222  
         }
 223  5
     }
 224  
 
 225  
     protected Exchange createExchange(Object result) {
 226  3
         Exchange exchange = endpoint.createExchange();
 227  3
         exchange.getIn().setBody(result);
 228  3
         return exchange;
 229  
     }
 230  
 }