001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.spring;
018    
019    import java.lang.reflect.Field;
020    import java.lang.reflect.Method;
021    
022    import javax.xml.bind.annotation.XmlAccessType;
023    import javax.xml.bind.annotation.XmlAccessorType;
024    import javax.xml.bind.annotation.XmlRootElement;
025    import javax.xml.bind.annotation.XmlTransient;
026    
027    import org.apache.camel.CamelContextAware;
028    import org.apache.camel.CamelTemplate;
029    import org.apache.camel.Consumer;
030    import org.apache.camel.Endpoint;
031    import org.apache.camel.EndpointInject;
032    import org.apache.camel.MessageDriven;
033    import org.apache.camel.Processor;
034    import org.apache.camel.Producer;
035    import org.apache.camel.RuntimeCamelException;
036    import org.apache.camel.component.bean.BeanProcessor;
037    import org.apache.camel.spring.util.ReflectionUtils;
038    import org.apache.camel.util.ObjectHelper;
039    import org.apache.commons.logging.Log;
040    import org.apache.commons.logging.LogFactory;
041    
042    import org.springframework.beans.BeansException;
043    import org.springframework.beans.factory.NoSuchBeanDefinitionException;
044    import org.springframework.beans.factory.config.BeanPostProcessor;
045    import org.springframework.context.ApplicationContext;
046    import org.springframework.context.ApplicationContextAware;
047    
048    import static org.apache.camel.util.ObjectHelper.isNotNullAndNonEmpty;
049    
050    /**
051     * A post processor to perform injection of {@link Endpoint} and
052     * {@link Producer} instances together with binding methods annotated with
053     * {@link @MessageDriven} to a Camel consumer.
054     * 
055     * @version $Revision: 1.1 $
056     */
057    @XmlRootElement(name = "beanPostProcessor")
058    @XmlAccessorType(XmlAccessType.FIELD)
059    public class CamelBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware {
060        private static final transient Log LOG = LogFactory.getLog(CamelBeanPostProcessor.class);
061        @XmlTransient
062        private SpringCamelContext camelContext;
063        @XmlTransient
064        private ApplicationContext applicationContext;
065    
066        // private List<Consumer> consumers = new ArrayList<Consumer>();
067    
068        public CamelBeanPostProcessor() {
069        }
070    
071        public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
072            injectFields(bean);
073            injectMethods(bean);
074            if (bean instanceof CamelContextAware) {
075                CamelContextAware contextAware = (CamelContextAware)bean;
076                if (camelContext == null) {
077                    LOG.warn("No CamelContext defined yet so cannot inject into: " + bean);
078                } else {
079                    contextAware.setCamelContext(camelContext);
080                }
081            }
082            return bean;
083        }
084    
085        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
086            return bean;
087        }
088    
089        // Properties
090        // -------------------------------------------------------------------------
091    
092        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
093            this.applicationContext = applicationContext;
094        }
095    
096        public SpringCamelContext getCamelContext() {
097            return camelContext;
098        }
099    
100        public void setCamelContext(SpringCamelContext camelContext) {
101            this.camelContext = camelContext;
102        }
103    
104        // Implementation methods
105        // -------------------------------------------------------------------------
106    
107        /**
108         * A strategy method to allow implementations to perform some custom JBI
109         * based injection of the POJO
110         * 
111         * @param bean the bean to be injected
112         */
113        protected void injectFields(final Object bean) {
114            ReflectionUtils.doWithFields(bean.getClass(), new ReflectionUtils.FieldCallback() {
115                public void doWith(Field field) throws IllegalArgumentException, IllegalAccessException {
116                    EndpointInject annotation = field.getAnnotation(EndpointInject.class);
117                    if (annotation != null) {
118                        ReflectionUtils.setField(field, bean, getEndpointInjectionValue(annotation, field.getType()));
119                    }
120                }
121            });
122        }
123    
124        protected void injectMethods(final Object bean) {
125            ReflectionUtils.doWithMethods(bean.getClass(), new ReflectionUtils.MethodCallback() {
126                @SuppressWarnings("unchecked")
127                public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
128                    setterInjection(method, bean);
129                    consumerInjection(method, bean);
130                }
131            });
132        }
133    
134        protected void setterInjection(Method method, Object bean) {
135            EndpointInject annoation = method.getAnnotation(EndpointInject.class);
136            if (annoation != null) {
137                Class<?>[] parameterTypes = method.getParameterTypes();
138                if (parameterTypes != null) {
139                    if (parameterTypes.length != 1) {
140                        LOG.warn("Ignoring badly annotated method for injection due to incorrect number of parameters: " + method);
141                    } else {
142                        Object value = getEndpointInjectionValue(annoation, parameterTypes[0]);
143                        ObjectHelper.invokeMethod(method, bean, value);
144                    }
145                }
146            }
147        }
148    
149        protected void consumerInjection(final Object bean) {
150            ReflectionUtils.doWithMethods(bean.getClass(), new ReflectionUtils.MethodCallback() {
151                @SuppressWarnings("unchecked")
152                public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
153                    /*
154                     * TODO support callbacks? if
155                     * (method.getAnnotation(Callback.class) != null) { try {
156                     * Expression e = ExpressionFactory.createExpression(
157                     * method.getAnnotation(Callback.class).condition());
158                     * JexlContext jc = JexlHelper.createContext();
159                     * jc.getVars().put("this", obj); Object r = e.evaluate(jc); if
160                     * (!(r instanceof Boolean)) { throw new
161                     * RuntimeException("Expression did not returned a boolean value
162                     * but: " + r); } Boolean oldVal =
163                     * req.getCallbacks().get(method); Boolean newVal = (Boolean) r;
164                     * if ((oldVal == null || !oldVal) && newVal) {
165                     * req.getCallbacks().put(method, newVal); method.invoke(obj,
166                     * new Object[0]); // TODO: handle return value and sent it as
167                     * the answer } } catch (Exception e) { throw new
168                     * RuntimeException("Unable to invoke callback", e); } }
169                     */
170                }
171            });
172        }
173    
174        protected void consumerInjection(Method method, Object bean) {
175            MessageDriven annotation = method.getAnnotation(MessageDriven.class);
176            if (annotation != null) {
177                LOG.info("Creating a consumer for: " + annotation);
178    
179                // lets bind this method to a listener
180                Endpoint endpoint = getEndpointInjection(annotation.uri(), annotation.name());
181                if (endpoint != null) {
182                    try {
183                        Processor processor = createConsumerProcessor(bean, method, endpoint);
184                        LOG.info("Created processor: " + processor);
185                        Consumer consumer = endpoint.createConsumer(processor);
186                        consumer.start();
187                        addConsumer(consumer);
188                    } catch (Exception e) {
189                        LOG.warn(e);
190                        throw new RuntimeCamelException(e);
191                    }
192                }
193            }
194        }
195    
196        /**
197         * Create a processor which invokes the given method when an incoming
198         * message exchange is received
199         */
200        protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) {
201            BeanProcessor answer = new BeanProcessor(pojo, getCamelContext());
202            answer.setMethod(method);
203            return answer;
204        }
205    
206        protected void addConsumer(Consumer consumer) {
207            LOG.debug("Adding consumer: " + consumer);
208            // consumers.add(consumer);
209        }
210    
211        /**
212         * Creates the value for the injection point for the given annotation
213         */
214        protected Object getEndpointInjectionValue(EndpointInject annotation, Class<?> type) {
215            Endpoint endpoint = getEndpointInjection(annotation.uri(), annotation.name());
216            if (endpoint != null) {
217                if (type.isInstance(endpoint)) {
218                    return endpoint;
219                } else if (type.isAssignableFrom(Producer.class)) {
220                    try {
221                        return endpoint.createProducer();
222                    } catch (Exception e) {
223                        throw new RuntimeCamelException(e);
224                    }
225                } else if (type.isAssignableFrom(CamelTemplate.class)) {
226                    return new CamelTemplate(getCamelContext(), endpoint);
227                }
228            }
229            return null;
230        }
231    
232        protected Endpoint getEndpointInjection(String uri, String name) {
233            Endpoint endpoint = null;
234            if (isNotNullAndNonEmpty(uri)) {
235                endpoint = camelContext.getEndpoint(uri);
236            } else {
237                if (isNotNullAndNonEmpty(name)) {
238                    endpoint = (Endpoint)applicationContext.getBean(name);
239                    if (endpoint == null) {
240                        throw new NoSuchBeanDefinitionException(name);
241                    }
242                } else {
243                    LOG.warn("No uri or name specified on @EndpointInject annotation!");
244                }
245            }
246            return endpoint;
247        }
248    
249    }