001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.lang.reflect.Method;
020    
021    import javax.xml.bind.annotation.XmlTransient;
022    
023    import org.apache.camel.CamelContext;
024    import org.apache.camel.CamelContextAware;
025    import org.apache.camel.Consume;
026    import org.apache.camel.Consumer;
027    import org.apache.camel.Endpoint;
028    import org.apache.camel.PollingConsumer;
029    import org.apache.camel.Processor;
030    import org.apache.camel.Producer;
031    import org.apache.camel.ProducerTemplate;
032    import org.apache.camel.Service;
033    import org.apache.camel.component.bean.BeanProcessor;
034    import org.apache.camel.component.bean.ProxyHelper;
035    import org.apache.camel.util.CamelContextHelper;
036    import org.apache.camel.util.ObjectHelper;
037    import org.apache.commons.logging.Log;
038    import org.apache.commons.logging.LogFactory;
039    
040    /**
041     * A helper class for Camel based injector or post processing hooks which can be reused by
042     * both the <a href="http://camel.apache.org/spring.html">Spring</a>
043     * and <a href="http://camel.apache.org/guice.html">Guice</a> support.
044     *
045     * @version $Revision: 788297 $
046     */
047    public class CamelPostProcessorHelper implements CamelContextAware {
048        private static final transient Log LOG = LogFactory.getLog(CamelPostProcessorHelper.class);
049    
050        @XmlTransient
051        private CamelContext camelContext;
052    
053        public CamelPostProcessorHelper() {
054        }
055    
056        public CamelPostProcessorHelper(CamelContext camelContext) {
057            this.setCamelContext(camelContext);
058        }
059    
060        public CamelContext getCamelContext() {
061            return camelContext;
062        }
063    
064        public void setCamelContext(CamelContext camelContext) {
065            this.camelContext = camelContext;
066        }
067    
068        /**
069         * Does the given context match this camel context
070         */
071        public boolean matchContext(String context) {
072            if (ObjectHelper.isNotEmpty(context)) {
073                if (!camelContext.getName().equals(context)) {
074                    return false;
075                }
076            }
077            return true;
078        }
079    
080        public void consumerInjection(Method method, Object bean) {
081            Consume consume = method.getAnnotation(Consume.class);
082            if (consume != null && matchContext(consume.context())) {
083                LOG.info("Creating a consumer for: " + consume);
084                subscribeMethod(method, bean, consume.uri(), consume.ref());
085            }
086        }
087    
088        public void subscribeMethod(Method method, Object bean, String endpointUri, String endpointName) {
089            // lets bind this method to a listener
090            String injectionPointName = method.getName();
091            Endpoint endpoint = getEndpointInjection(endpointUri, endpointName, injectionPointName, true);
092            if (endpoint != null) {
093                try {
094                    Processor processor = createConsumerProcessor(bean, method, endpoint);
095                    Consumer consumer = endpoint.createConsumer(processor);
096                    if (LOG.isDebugEnabled()) {
097                        LOG.debug("Created processor: " + processor + " for consumer: " + consumer);
098                    }
099                    startService(consumer);
100                } catch (Exception e) {
101                    throw ObjectHelper.wrapRuntimeCamelException(e);
102                }
103            }
104        }
105    
106        public void startService(Service service) throws Exception {
107            CamelContext camelContext = getCamelContext();
108            if (camelContext instanceof DefaultCamelContext) {
109                DefaultCamelContext defaultCamelContext = (DefaultCamelContext) camelContext;
110                defaultCamelContext.addService(service);
111            } else {
112                service.start();
113            }
114        }
115    
116        /**
117         * Create a processor which invokes the given method when an incoming
118         * message exchange is received
119         */
120        protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) {
121            BeanProcessor answer = new BeanProcessor(pojo, getCamelContext());
122            answer.setMethodObject(method);
123            return answer;
124        }
125    
126        protected Endpoint getEndpointInjection(String uri, String name, String injectionPointName, boolean mandatory) {
127            return CamelContextHelper.getEndpointInjection(getCamelContext(), uri, name, injectionPointName, mandatory);
128        }
129    
130        /**
131         * Creates the object to be injected for an {@link org.apache.camel.EndpointInject} or {@link org.apache.camel.Produce} injection point
132         */
133        public Object getInjectionValue(Class<?> type, String endpointUri, String endpointRef, String injectionPointName) {
134            if (type.isAssignableFrom(ProducerTemplate.class)) {
135                return createInjectionProducerTemplate(endpointUri, endpointRef, injectionPointName);
136            } else {
137                Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, true);
138                if (endpoint != null) {
139                    if (type.isInstance(endpoint)) {
140                        return endpoint;
141                    } else if (type.isAssignableFrom(Producer.class)) {
142                        return createInjectionProducer(endpoint);
143                    } else if (type.isAssignableFrom(PollingConsumer.class)) {
144                        return createInjectionPollingConsumer(endpoint);
145                    } else if (type.isInterface()) {
146                        // lets create a proxy
147                        try {
148                            return ProxyHelper.createProxy(endpoint, type);
149                        } catch (Exception e) {
150                            throw createProxyInstantiationRuntimeException(type, endpoint, e);
151                        }
152                    } else {
153                        throw new IllegalArgumentException("Invalid type: " + type.getName()
154                                + " which cannot be injected via @EndpointInject/@Produce for: " + endpoint);
155                    }
156                }
157                return null;
158            }
159        }
160    
161        /**
162         * Factory method to create a {@link org.apache.camel.ProducerTemplate} to be injected into a POJO
163         */
164        protected ProducerTemplate createInjectionProducerTemplate(String endpointUri, String endpointRef, String injectionPointName) {
165            // endpoint is optional for this injection point
166            Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, false);
167            return new DefaultProducerTemplate(getCamelContext(), endpoint);
168        }
169    
170        /**
171         * Factory method to create a started {@link org.apache.camel.PollingConsumer} to be injected into a POJO
172         */
173        protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint) {
174            try {
175                PollingConsumer pollingConsumer = endpoint.createPollingConsumer();
176                startService(pollingConsumer);
177                return pollingConsumer;
178            } catch (Exception e) {
179                throw ObjectHelper.wrapRuntimeCamelException(e);
180            }
181        }
182    
183        /**
184         * A Factory method to create a started {@link org.apache.camel.Producer} to be injected into a POJO
185         */
186        protected Producer createInjectionProducer(Endpoint endpoint) {
187            try {
188                Producer producer = endpoint.createProducer();
189                startService(producer);
190                return producer;
191            } catch (Exception e) {
192                throw ObjectHelper.wrapRuntimeCamelException(e);
193            }
194        }
195    
196        protected RuntimeException createProxyInstantiationRuntimeException(Class<?> type, Endpoint endpoint, Exception e) {
197            return new ProxyInstantiationException(type, endpoint, e);
198        }
199    }