1 |
|
|
2 |
|
|
3 |
|
|
4 |
|
|
5 |
|
|
6 |
|
|
7 |
|
|
8 |
|
|
9 |
|
|
10 |
|
|
11 |
|
|
12 |
|
|
13 |
|
|
14 |
|
|
15 |
|
|
16 |
|
|
17 |
|
package org.apache.camel.component.jpa; |
18 |
|
|
19 |
|
import java.lang.reflect.Method; |
20 |
|
import java.util.List; |
21 |
|
|
22 |
|
import javax.persistence.EntityManager; |
23 |
|
import javax.persistence.LockModeType; |
24 |
|
import javax.persistence.PersistenceException; |
25 |
|
import javax.persistence.Query; |
26 |
|
|
27 |
|
import org.apache.camel.Exchange; |
28 |
|
import org.apache.camel.Processor; |
29 |
|
import org.apache.camel.impl.ScheduledPollConsumer; |
30 |
|
import org.apache.camel.util.ObjectHelper; |
31 |
|
import org.apache.commons.logging.Log; |
32 |
|
import org.apache.commons.logging.LogFactory; |
33 |
|
|
34 |
|
import org.springframework.orm.jpa.JpaCallback; |
35 |
|
|
36 |
|
|
37 |
|
|
38 |
|
|
39 |
3 |
public class JpaConsumer extends ScheduledPollConsumer<Exchange> { |
40 |
1 |
private static final transient Log LOG = LogFactory.getLog(JpaConsumer.class); |
41 |
|
private final JpaEndpoint endpoint; |
42 |
|
private final TransactionStrategy template; |
43 |
|
private QueryFactory queryFactory; |
44 |
|
private DeleteHandler<Object> deleteHandler; |
45 |
|
private String query; |
46 |
|
private String namedQuery; |
47 |
|
private String nativeQuery; |
48 |
|
|
49 |
|
public JpaConsumer(JpaEndpoint endpoint, Processor processor) { |
50 |
3 |
super(endpoint, processor); |
51 |
3 |
this.endpoint = endpoint; |
52 |
3 |
this.template = endpoint.createTransactionStrategy(); |
53 |
3 |
} |
54 |
|
|
55 |
|
protected void poll() throws Exception { |
56 |
5 |
template.execute(new JpaCallback() { |
57 |
5 |
public Object doInJpa(EntityManager entityManager) throws PersistenceException { |
58 |
5 |
Query query = getQueryFactory().createQuery(entityManager); |
59 |
5 |
configureParameters(query); |
60 |
5 |
List results = query.getResultList(); |
61 |
5 |
for (Object result : results) { |
62 |
3 |
if (LOG.isDebugEnabled()) { |
63 |
0 |
LOG.debug("Processing new entity: " + result); |
64 |
|
} |
65 |
|
|
66 |
3 |
if (lockEntity(result, entityManager)) { |
67 |
|
|
68 |
|
|
69 |
3 |
Exchange exchange = createExchange(result); |
70 |
|
try { |
71 |
3 |
getProcessor().process(exchange); |
72 |
0 |
} catch (Exception e) { |
73 |
0 |
throw new PersistenceException(e); |
74 |
3 |
} |
75 |
3 |
getDeleteHandler().deleteObject(entityManager, result); |
76 |
|
} |
77 |
3 |
} |
78 |
5 |
entityManager.flush(); |
79 |
5 |
return null; |
80 |
|
} |
81 |
|
}); |
82 |
5 |
} |
83 |
|
|
84 |
|
|
85 |
|
|
86 |
|
public JpaEndpoint getEndpoint() { |
87 |
11 |
return endpoint; |
88 |
|
} |
89 |
|
|
90 |
|
public QueryFactory getQueryFactory() { |
91 |
5 |
if (queryFactory == null) { |
92 |
3 |
queryFactory = createQueryFactory(); |
93 |
3 |
if (queryFactory == null) { |
94 |
0 |
throw new IllegalArgumentException("No queryType property configured on this consumer, nor an entityType configured on the endpoint so cannot consume"); |
95 |
|
} |
96 |
|
} |
97 |
5 |
return queryFactory; |
98 |
|
} |
99 |
|
|
100 |
|
public void setQueryFactory(QueryFactory queryFactory) { |
101 |
0 |
this.queryFactory = queryFactory; |
102 |
0 |
} |
103 |
|
|
104 |
|
public DeleteHandler getDeleteHandler() { |
105 |
3 |
if (deleteHandler == null) { |
106 |
3 |
deleteHandler = createDeleteHandler(); |
107 |
|
} |
108 |
3 |
return deleteHandler; |
109 |
|
} |
110 |
|
|
111 |
|
public void setDeleteHandler(DeleteHandler deleteHandler) { |
112 |
0 |
this.deleteHandler = deleteHandler; |
113 |
0 |
} |
114 |
|
|
115 |
|
public String getNamedQuery() { |
116 |
0 |
return namedQuery; |
117 |
|
} |
118 |
|
|
119 |
|
public void setNamedQuery(String namedQuery) { |
120 |
1 |
this.namedQuery = namedQuery; |
121 |
1 |
} |
122 |
|
|
123 |
|
public String getNativeQuery() { |
124 |
0 |
return nativeQuery; |
125 |
|
} |
126 |
|
|
127 |
|
public void setNativeQuery(String nativeQuery) { |
128 |
0 |
this.nativeQuery = nativeQuery; |
129 |
0 |
} |
130 |
|
|
131 |
|
public String getQuery() { |
132 |
0 |
return query; |
133 |
|
} |
134 |
|
|
135 |
|
public void setQuery(String query) { |
136 |
0 |
this.query = query; |
137 |
0 |
} |
138 |
|
|
139 |
|
|
140 |
|
|
141 |
|
|
142 |
|
|
143 |
|
|
144 |
|
|
145 |
|
|
146 |
|
|
147 |
|
|
148 |
|
|
149 |
|
|
150 |
|
protected boolean lockEntity(Object entity, EntityManager entityManager) { |
151 |
3 |
if (!getEndpoint().isConsumeDelete() || !getEndpoint().isConsumeLockEntity()) { |
152 |
0 |
return true; |
153 |
|
} |
154 |
|
try { |
155 |
3 |
if (LOG.isDebugEnabled()) { |
156 |
0 |
LOG.debug("Acquiring exclusive lock on entity: " + entity); |
157 |
|
} |
158 |
3 |
entityManager.lock(entity, LockModeType.WRITE); |
159 |
3 |
return true; |
160 |
0 |
} catch (Exception e) { |
161 |
0 |
if (LOG.isDebugEnabled()) { |
162 |
0 |
LOG.debug("Failed to achieve lock on entity: " + entity + ". Reason: " + e, e); |
163 |
|
} |
164 |
0 |
return false; |
165 |
|
} |
166 |
|
} |
167 |
|
|
168 |
|
protected QueryFactory createQueryFactory() { |
169 |
3 |
if (query != null) { |
170 |
0 |
return QueryBuilder.query(query); |
171 |
3 |
} else if (namedQuery != null) { |
172 |
1 |
return QueryBuilder.namedQuery(namedQuery); |
173 |
2 |
} else if (nativeQuery != null) { |
174 |
0 |
return QueryBuilder.nativeQuery(nativeQuery); |
175 |
|
} else { |
176 |
2 |
Class<?> entityType = endpoint.getEntityType(); |
177 |
2 |
if (entityType == null) { |
178 |
0 |
return null; |
179 |
|
} else { |
180 |
2 |
return QueryBuilder.query("select x from " + entityType.getName() + " x"); |
181 |
|
} |
182 |
|
} |
183 |
|
} |
184 |
|
|
185 |
|
protected DeleteHandler<Object> createDeleteHandler() { |
186 |
|
|
187 |
|
|
188 |
3 |
Class<?> entityType = getEndpoint().getEntityType(); |
189 |
3 |
if (entityType != null) { |
190 |
3 |
List<Method> methods = ObjectHelper.findMethodsWithAnnotation(entityType, Consumed.class); |
191 |
3 |
if (methods.size() > 1) { |
192 |
0 |
throw new IllegalArgumentException("Only one method can be annotated with the @Consumed annotation but found: " + methods); |
193 |
3 |
} else if (methods.size() == 1) { |
194 |
1 |
final Method method = methods.get(0); |
195 |
|
|
196 |
1 |
return new DeleteHandler<Object>() { |
197 |
1 |
public void deleteObject(EntityManager entityManager, Object entityBean) { |
198 |
1 |
ObjectHelper.invokeMethod(method, entityBean); |
199 |
1 |
} |
200 |
|
}; |
201 |
|
} |
202 |
|
} |
203 |
2 |
if (getEndpoint().isConsumeDelete()) { |
204 |
2 |
return new DeleteHandler<Object>() { |
205 |
2 |
public void deleteObject(EntityManager entityManager, Object entityBean) { |
206 |
2 |
entityManager.remove(entityBean); |
207 |
2 |
} |
208 |
|
}; |
209 |
|
} else { |
210 |
0 |
return new DeleteHandler<Object>() { |
211 |
0 |
public void deleteObject(EntityManager entityManager, Object entityBean) { |
212 |
|
|
213 |
0 |
} |
214 |
|
}; |
215 |
|
} |
216 |
|
} |
217 |
|
|
218 |
|
protected void configureParameters(Query query) { |
219 |
5 |
int maxResults = endpoint.getMaximumResults(); |
220 |
5 |
if (maxResults > 0) { |
221 |
0 |
query.setMaxResults(maxResults); |
222 |
|
} |
223 |
5 |
} |
224 |
|
|
225 |
|
protected Exchange createExchange(Object result) { |
226 |
3 |
Exchange exchange = endpoint.createExchange(); |
227 |
3 |
exchange.getIn().setBody(result); |
228 |
3 |
return exchange; |
229 |
|
} |
230 |
|
} |