001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.jpa;
018    
019    import java.lang.reflect.Method;
020    import java.util.List;
021    
022    import javax.persistence.EntityManager;
023    import javax.persistence.LockModeType;
024    import javax.persistence.PersistenceException;
025    import javax.persistence.Query;
026    
027    import org.apache.camel.Exchange;
028    import org.apache.camel.Processor;
029    import org.apache.camel.impl.ScheduledPollConsumer;
030    import org.apache.camel.util.ObjectHelper;
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    
034    import org.springframework.orm.jpa.JpaCallback;
035    
036    /**
037     * @version $Revision: 563665 $
038     */
039    public class JpaConsumer extends ScheduledPollConsumer<Exchange> {
040        private static final transient Log LOG = LogFactory.getLog(JpaConsumer.class);
041        private final JpaEndpoint endpoint;
042        private final TransactionStrategy template;
043        private QueryFactory queryFactory;
044        private DeleteHandler<Object> deleteHandler;
045        private String query;
046        private String namedQuery;
047        private String nativeQuery;
048    
049        public JpaConsumer(JpaEndpoint endpoint, Processor processor) {
050            super(endpoint, processor);
051            this.endpoint = endpoint;
052            this.template = endpoint.createTransactionStrategy();
053        }
054    
055        protected void poll() throws Exception {
056            template.execute(new JpaCallback() {
057                public Object doInJpa(EntityManager entityManager) throws PersistenceException {
058                    Query query = getQueryFactory().createQuery(entityManager);
059                    configureParameters(query);
060                    List results = query.getResultList();
061                    for (Object result : results) {
062                        if (LOG.isDebugEnabled()) {
063                            LOG.debug("Processing new entity: " + result);
064                        }
065    
066                        if (lockEntity(result, entityManager)) {
067                            // lets turn the result into an exchange and fire it
068                            // into the processor
069                            Exchange exchange = createExchange(result);
070                            try {
071                                getProcessor().process(exchange);
072                            } catch (Exception e) {
073                                throw new PersistenceException(e);
074                            }
075                            getDeleteHandler().deleteObject(entityManager, result);
076                        }
077                    }
078                    entityManager.flush();
079                    return null;
080                }
081            });
082        }
083    
084        // Properties
085        // -------------------------------------------------------------------------
086        public JpaEndpoint getEndpoint() {
087            return endpoint;
088        }
089    
090        public QueryFactory getQueryFactory() {
091            if (queryFactory == null) {
092                queryFactory = createQueryFactory();
093                if (queryFactory == null) {
094                    throw new IllegalArgumentException("No queryType property configured on this consumer, nor an entityType configured on the endpoint so cannot consume");
095                }
096            }
097            return queryFactory;
098        }
099    
100        public void setQueryFactory(QueryFactory queryFactory) {
101            this.queryFactory = queryFactory;
102        }
103    
104        public DeleteHandler getDeleteHandler() {
105            if (deleteHandler == null) {
106                deleteHandler = createDeleteHandler();
107            }
108            return deleteHandler;
109        }
110    
111        public void setDeleteHandler(DeleteHandler deleteHandler) {
112            this.deleteHandler = deleteHandler;
113        }
114    
115        public String getNamedQuery() {
116            return namedQuery;
117        }
118    
119        public void setNamedQuery(String namedQuery) {
120            this.namedQuery = namedQuery;
121        }
122    
123        public String getNativeQuery() {
124            return nativeQuery;
125        }
126    
127        public void setNativeQuery(String nativeQuery) {
128            this.nativeQuery = nativeQuery;
129        }
130    
131        public String getQuery() {
132            return query;
133        }
134    
135        public void setQuery(String query) {
136            this.query = query;
137        }
138    
139        // Implementation methods
140        // -------------------------------------------------------------------------
141    
142        /**
143         * A strategy method to lock an object with an exclusive lock so that it can
144         * be processed
145         * 
146         * @param entity the entity to be locked
147         * @param entityManager
148         * @return true if the entity was locked
149         */
150        protected boolean lockEntity(Object entity, EntityManager entityManager) {
151            if (!getEndpoint().isConsumeDelete() || !getEndpoint().isConsumeLockEntity()) {
152                return true;
153            }
154            try {
155                if (LOG.isDebugEnabled()) {
156                    LOG.debug("Acquiring exclusive lock on entity: " + entity);
157                }
158                entityManager.lock(entity, LockModeType.WRITE);
159                return true;
160            } catch (Exception e) {
161                if (LOG.isDebugEnabled()) {
162                    LOG.debug("Failed to achieve lock on entity: " + entity + ". Reason: " + e, e);
163                }
164                return false;
165            }
166        }
167    
168        protected QueryFactory createQueryFactory() {
169            if (query != null) {
170                return QueryBuilder.query(query);
171            } else if (namedQuery != null) {
172                return QueryBuilder.namedQuery(namedQuery);
173            } else if (nativeQuery != null) {
174                return QueryBuilder.nativeQuery(nativeQuery);
175            } else {
176                Class<?> entityType = endpoint.getEntityType();
177                if (entityType == null) {
178                    return null;
179                } else {
180                    return QueryBuilder.query("select x from " + entityType.getName() + " x");
181                }
182            }
183        }
184    
185        protected DeleteHandler<Object> createDeleteHandler() {
186            // TODO auto-discover an annotation in the entity bean to indicate the
187            // process completed method call?
188            Class<?> entityType = getEndpoint().getEntityType();
189            if (entityType != null) {
190                List<Method> methods = ObjectHelper.findMethodsWithAnnotation(entityType, Consumed.class);
191                if (methods.size() > 1) {
192                    throw new IllegalArgumentException("Only one method can be annotated with the @Consumed annotation but found: " + methods);
193                } else if (methods.size() == 1) {
194                    final Method method = methods.get(0);
195    
196                    return new DeleteHandler<Object>() {
197                        public void deleteObject(EntityManager entityManager, Object entityBean) {
198                            ObjectHelper.invokeMethod(method, entityBean);
199                        }
200                    };
201                }
202            }
203            if (getEndpoint().isConsumeDelete()) {
204                return new DeleteHandler<Object>() {
205                    public void deleteObject(EntityManager entityManager, Object entityBean) {
206                        entityManager.remove(entityBean);
207                    }
208                };
209            } else {
210                return new DeleteHandler<Object>() {
211                    public void deleteObject(EntityManager entityManager, Object entityBean) {
212                        // do nothing
213                    }
214                };
215            }
216        }
217    
218        protected void configureParameters(Query query) {
219            int maxResults = endpoint.getMaximumResults();
220            if (maxResults > 0) {
221                query.setMaxResults(maxResults);
222            }
223        }
224    
225        protected Exchange createExchange(Object result) {
226            Exchange exchange = endpoint.createExchange();
227            exchange.getIn().setBody(result);
228            return exchange;
229        }
230    }