001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.jpa; 018 019 import java.lang.reflect.Method; 020 import java.util.List; 021 022 import javax.persistence.EntityManager; 023 import javax.persistence.LockModeType; 024 import javax.persistence.PersistenceException; 025 import javax.persistence.Query; 026 027 import org.apache.camel.Exchange; 028 import org.apache.camel.Processor; 029 import org.apache.camel.impl.ScheduledPollConsumer; 030 import org.apache.camel.util.ObjectHelper; 031 import org.apache.commons.logging.Log; 032 import org.apache.commons.logging.LogFactory; 033 034 import org.springframework.orm.jpa.JpaCallback; 035 036 /** 037 * @version $Revision: 563665 $ 038 */ 039 public class JpaConsumer extends ScheduledPollConsumer<Exchange> { 040 private static final transient Log LOG = LogFactory.getLog(JpaConsumer.class); 041 private final JpaEndpoint endpoint; 042 private final TransactionStrategy template; 043 private QueryFactory queryFactory; 044 private DeleteHandler<Object> deleteHandler; 045 private String query; 046 private String namedQuery; 047 private String nativeQuery; 048 049 public JpaConsumer(JpaEndpoint endpoint, Processor processor) { 050 super(endpoint, processor); 051 this.endpoint = endpoint; 052 this.template = endpoint.createTransactionStrategy(); 053 } 054 055 protected void poll() throws Exception { 056 template.execute(new JpaCallback() { 057 public Object doInJpa(EntityManager entityManager) throws PersistenceException { 058 Query query = getQueryFactory().createQuery(entityManager); 059 configureParameters(query); 060 List results = query.getResultList(); 061 for (Object result : results) { 062 if (LOG.isDebugEnabled()) { 063 LOG.debug("Processing new entity: " + result); 064 } 065 066 if (lockEntity(result, entityManager)) { 067 // lets turn the result into an exchange and fire it 068 // into the processor 069 Exchange exchange = createExchange(result); 070 try { 071 getProcessor().process(exchange); 072 } catch (Exception e) { 073 throw new PersistenceException(e); 074 } 075 getDeleteHandler().deleteObject(entityManager, result); 076 } 077 } 078 entityManager.flush(); 079 return null; 080 } 081 }); 082 } 083 084 // Properties 085 // ------------------------------------------------------------------------- 086 public JpaEndpoint getEndpoint() { 087 return endpoint; 088 } 089 090 public QueryFactory getQueryFactory() { 091 if (queryFactory == null) { 092 queryFactory = createQueryFactory(); 093 if (queryFactory == null) { 094 throw new IllegalArgumentException("No queryType property configured on this consumer, nor an entityType configured on the endpoint so cannot consume"); 095 } 096 } 097 return queryFactory; 098 } 099 100 public void setQueryFactory(QueryFactory queryFactory) { 101 this.queryFactory = queryFactory; 102 } 103 104 public DeleteHandler getDeleteHandler() { 105 if (deleteHandler == null) { 106 deleteHandler = createDeleteHandler(); 107 } 108 return deleteHandler; 109 } 110 111 public void setDeleteHandler(DeleteHandler deleteHandler) { 112 this.deleteHandler = deleteHandler; 113 } 114 115 public String getNamedQuery() { 116 return namedQuery; 117 } 118 119 public void setNamedQuery(String namedQuery) { 120 this.namedQuery = namedQuery; 121 } 122 123 public String getNativeQuery() { 124 return nativeQuery; 125 } 126 127 public void setNativeQuery(String nativeQuery) { 128 this.nativeQuery = nativeQuery; 129 } 130 131 public String getQuery() { 132 return query; 133 } 134 135 public void setQuery(String query) { 136 this.query = query; 137 } 138 139 // Implementation methods 140 // ------------------------------------------------------------------------- 141 142 /** 143 * A strategy method to lock an object with an exclusive lock so that it can 144 * be processed 145 * 146 * @param entity the entity to be locked 147 * @param entityManager 148 * @return true if the entity was locked 149 */ 150 protected boolean lockEntity(Object entity, EntityManager entityManager) { 151 if (!getEndpoint().isConsumeDelete() || !getEndpoint().isConsumeLockEntity()) { 152 return true; 153 } 154 try { 155 if (LOG.isDebugEnabled()) { 156 LOG.debug("Acquiring exclusive lock on entity: " + entity); 157 } 158 entityManager.lock(entity, LockModeType.WRITE); 159 return true; 160 } catch (Exception e) { 161 if (LOG.isDebugEnabled()) { 162 LOG.debug("Failed to achieve lock on entity: " + entity + ". Reason: " + e, e); 163 } 164 return false; 165 } 166 } 167 168 protected QueryFactory createQueryFactory() { 169 if (query != null) { 170 return QueryBuilder.query(query); 171 } else if (namedQuery != null) { 172 return QueryBuilder.namedQuery(namedQuery); 173 } else if (nativeQuery != null) { 174 return QueryBuilder.nativeQuery(nativeQuery); 175 } else { 176 Class<?> entityType = endpoint.getEntityType(); 177 if (entityType == null) { 178 return null; 179 } else { 180 return QueryBuilder.query("select x from " + entityType.getName() + " x"); 181 } 182 } 183 } 184 185 protected DeleteHandler<Object> createDeleteHandler() { 186 // TODO auto-discover an annotation in the entity bean to indicate the 187 // process completed method call? 188 Class<?> entityType = getEndpoint().getEntityType(); 189 if (entityType != null) { 190 List<Method> methods = ObjectHelper.findMethodsWithAnnotation(entityType, Consumed.class); 191 if (methods.size() > 1) { 192 throw new IllegalArgumentException("Only one method can be annotated with the @Consumed annotation but found: " + methods); 193 } else if (methods.size() == 1) { 194 final Method method = methods.get(0); 195 196 return new DeleteHandler<Object>() { 197 public void deleteObject(EntityManager entityManager, Object entityBean) { 198 ObjectHelper.invokeMethod(method, entityBean); 199 } 200 }; 201 } 202 } 203 if (getEndpoint().isConsumeDelete()) { 204 return new DeleteHandler<Object>() { 205 public void deleteObject(EntityManager entityManager, Object entityBean) { 206 entityManager.remove(entityBean); 207 } 208 }; 209 } else { 210 return new DeleteHandler<Object>() { 211 public void deleteObject(EntityManager entityManager, Object entityBean) { 212 // do nothing 213 } 214 }; 215 } 216 } 217 218 protected void configureParameters(Query query) { 219 int maxResults = endpoint.getMaximumResults(); 220 if (maxResults > 0) { 221 query.setMaxResults(maxResults); 222 } 223 } 224 225 protected Exchange createExchange(Object result) { 226 Exchange exchange = endpoint.createExchange(); 227 exchange.getIn().setBody(result); 228 return exchange; 229 } 230 }