001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.lang.reflect.Method;
020    
021    import javax.xml.bind.annotation.XmlTransient;
022    
023    import org.apache.camel.CamelContext;
024    import org.apache.camel.CamelContextAware;
025    import org.apache.camel.Consume;
026    import org.apache.camel.Consumer;
027    import org.apache.camel.Endpoint;
028    import org.apache.camel.PollingConsumer;
029    import org.apache.camel.Processor;
030    import org.apache.camel.Producer;
031    import org.apache.camel.ProducerTemplate;
032    import org.apache.camel.Service;
033    import org.apache.camel.component.bean.BeanProcessor;
034    import org.apache.camel.component.bean.ProxyHelper;
035    import org.apache.camel.util.CamelContextHelper;
036    import org.apache.camel.util.ObjectHelper;
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    
040    /**
041     * A helper class for Camel based injector or post processing hooks which can be reused by
042     * both the <a href="http://camel.apache.org/spring.html">Spring</a>
043     * and <a href="http://camel.apache.org/guice.html">Guice</a> support.
044     *
045     * @version $Revision: 776194 $
046     */
047    public class CamelPostProcessorHelper implements CamelContextAware {
048        private static final transient Log LOG = LogFactory.getLog(CamelPostProcessorHelper.class);
049    
050        @XmlTransient
051        private CamelContext camelContext;
052    
053        public CamelPostProcessorHelper() {
054        }
055    
056        public CamelPostProcessorHelper(CamelContext camelContext) {
057            this.setCamelContext(camelContext);
058        }
059    
060        public CamelContext getCamelContext() {
061            return camelContext;
062        }
063    
064        public void setCamelContext(CamelContext camelContext) {
065            this.camelContext = camelContext;
066        }
067    
068        public void consumerInjection(Method method, Object bean) {
069            Consume consume = method.getAnnotation(Consume.class);
070            if (consume != null) {
071                LOG.info("Creating a consumer for: " + consume);
072                subscribeMethod(method, bean, consume.uri(), consume.ref());
073            }
074        }
075    
076        public void subscribeMethod(Method method, Object bean, String endpointUri, String endpointName) {
077            // lets bind this method to a listener
078            String injectionPointName = method.getName();
079            Endpoint endpoint = getEndpointInjection(endpointUri, endpointName, injectionPointName, true);
080            if (endpoint != null) {
081                try {
082                    Processor processor = createConsumerProcessor(bean, method, endpoint);
083                    Consumer consumer = endpoint.createConsumer(processor);
084                    if (LOG.isDebugEnabled()) {
085                        LOG.debug("Created processor: " + processor + " for consumer: " + consumer);
086                    }
087                    startService(consumer);
088                } catch (Exception e) {
089                    throw ObjectHelper.wrapRuntimeCamelException(e);
090                }
091            }
092        }
093    
094        public void startService(Service service) throws Exception {
095            CamelContext camelContext = getCamelContext();
096            if (camelContext instanceof DefaultCamelContext) {
097                DefaultCamelContext defaultCamelContext = (DefaultCamelContext) camelContext;
098                defaultCamelContext.addService(service);
099            } else {
100                service.start();
101            }
102        }
103    
104        /**
105         * Create a processor which invokes the given method when an incoming
106         * message exchange is received
107         */
108        protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) {
109            BeanProcessor answer = new BeanProcessor(pojo, getCamelContext());
110            answer.setMethodObject(method);
111            return answer;
112        }
113    
114        protected Endpoint getEndpointInjection(String uri, String name, String injectionPointName, boolean mandatory) {
115            return CamelContextHelper.getEndpointInjection(getCamelContext(), uri, name, injectionPointName, mandatory);
116        }
117    
118        /**
119         * Creates the object to be injected for an {@link org.apache.camel.EndpointInject} or {@link org.apache.camel.Produce} injection point
120         */
121        public Object getInjectionValue(Class<?> type, String endpointUri, String endpointRef, String injectionPointName) {
122            if (type.isAssignableFrom(ProducerTemplate.class)) {
123                return createInjectionProducerTemplate(endpointUri, endpointRef, injectionPointName);
124            } else {
125                Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, true);
126                if (endpoint != null) {
127                    if (type.isInstance(endpoint)) {
128                        return endpoint;
129                    } else if (type.isAssignableFrom(Producer.class)) {
130                        return createInjectionProducer(endpoint);
131                    } else if (type.isAssignableFrom(PollingConsumer.class)) {
132                        return createInjectionPollingConsumer(endpoint);
133                    } else if (type.isInterface()) {
134                        // lets create a proxy
135                        try {
136                            return ProxyHelper.createProxy(endpoint, type);
137                        } catch (Exception e) {
138                            throw createProxyInstantiationRuntimeException(type, endpoint, e);
139                        }
140                    } else {
141                        throw new IllegalArgumentException("Invalid type: " + type.getName()
142                                + " which cannot be injected via @EndpointInject/@Produce for: " + endpoint);
143                    }
144                }
145                return null;
146            }
147        }
148    
149        /**
150         * Factory method to create a {@link org.apache.camel.ProducerTemplate} to be injected into a POJO
151         */
152        protected ProducerTemplate createInjectionProducerTemplate(String endpointUri, String endpointRef, String injectionPointName) {
153            // endpoint is optional for this injection point
154            Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, false);
155            return new DefaultProducerTemplate(getCamelContext(), endpoint);
156        }
157    
158        /**
159         * Factory method to create a started {@link org.apache.camel.PollingConsumer} to be injected into a POJO
160         */
161        protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint) {
162            try {
163                PollingConsumer pollingConsumer = endpoint.createPollingConsumer();
164                startService(pollingConsumer);
165                return pollingConsumer;
166            } catch (Exception e) {
167                throw ObjectHelper.wrapRuntimeCamelException(e);
168            }
169        }
170    
171        /**
172         * A Factory method to create a started {@link org.apache.camel.Producer} to be injected into a POJO
173         */
174        protected Producer createInjectionProducer(Endpoint endpoint) {
175            try {
176                Producer producer = endpoint.createProducer();
177                startService(producer);
178                return producer;
179            } catch (Exception e) {
180                throw ObjectHelper.wrapRuntimeCamelException(e);
181            }
182        }
183    
184        protected RuntimeException createProxyInstantiationRuntimeException(Class<?> type, Endpoint endpoint, Exception e) {
185            return new ProxyInstantiationException(type, endpoint, e);
186        }
187    }