001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.lang.reflect.Method; 020 021 import javax.xml.bind.annotation.XmlTransient; 022 023 import org.apache.camel.CamelContext; 024 import org.apache.camel.CamelContextAware; 025 import org.apache.camel.Consume; 026 import org.apache.camel.Consumer; 027 import org.apache.camel.Endpoint; 028 import org.apache.camel.PollingConsumer; 029 import org.apache.camel.Processor; 030 import org.apache.camel.Producer; 031 import org.apache.camel.ProducerTemplate; 032 import org.apache.camel.Service; 033 import org.apache.camel.component.bean.BeanProcessor; 034 import org.apache.camel.component.bean.ProxyHelper; 035 import org.apache.camel.util.CamelContextHelper; 036 import org.apache.camel.util.ObjectHelper; 037 import org.apache.commons.logging.Log; 038 import org.apache.commons.logging.LogFactory; 039 040 /** 041 * A helper class for Camel based injector or post processing hooks which can be reused by 042 * both the <a href="http://camel.apache.org/spring.html">Spring</a> 043 * and <a href="http://camel.apache.org/guice.html">Guice</a> support. 044 * 045 * @version $Revision: 788297 $ 046 */ 047 public class CamelPostProcessorHelper implements CamelContextAware { 048 private static final transient Log LOG = LogFactory.getLog(CamelPostProcessorHelper.class); 049 050 @XmlTransient 051 private CamelContext camelContext; 052 053 public CamelPostProcessorHelper() { 054 } 055 056 public CamelPostProcessorHelper(CamelContext camelContext) { 057 this.setCamelContext(camelContext); 058 } 059 060 public CamelContext getCamelContext() { 061 return camelContext; 062 } 063 064 public void setCamelContext(CamelContext camelContext) { 065 this.camelContext = camelContext; 066 } 067 068 /** 069 * Does the given context match this camel context 070 */ 071 public boolean matchContext(String context) { 072 if (ObjectHelper.isNotEmpty(context)) { 073 if (!camelContext.getName().equals(context)) { 074 return false; 075 } 076 } 077 return true; 078 } 079 080 public void consumerInjection(Method method, Object bean) { 081 Consume consume = method.getAnnotation(Consume.class); 082 if (consume != null && matchContext(consume.context())) { 083 LOG.info("Creating a consumer for: " + consume); 084 subscribeMethod(method, bean, consume.uri(), consume.ref()); 085 } 086 } 087 088 public void subscribeMethod(Method method, Object bean, String endpointUri, String endpointName) { 089 // lets bind this method to a listener 090 String injectionPointName = method.getName(); 091 Endpoint endpoint = getEndpointInjection(endpointUri, endpointName, injectionPointName, true); 092 if (endpoint != null) { 093 try { 094 Processor processor = createConsumerProcessor(bean, method, endpoint); 095 Consumer consumer = endpoint.createConsumer(processor); 096 if (LOG.isDebugEnabled()) { 097 LOG.debug("Created processor: " + processor + " for consumer: " + consumer); 098 } 099 startService(consumer); 100 } catch (Exception e) { 101 throw ObjectHelper.wrapRuntimeCamelException(e); 102 } 103 } 104 } 105 106 public void startService(Service service) throws Exception { 107 CamelContext camelContext = getCamelContext(); 108 if (camelContext instanceof DefaultCamelContext) { 109 DefaultCamelContext defaultCamelContext = (DefaultCamelContext) camelContext; 110 defaultCamelContext.addService(service); 111 } else { 112 service.start(); 113 } 114 } 115 116 /** 117 * Create a processor which invokes the given method when an incoming 118 * message exchange is received 119 */ 120 protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) { 121 BeanProcessor answer = new BeanProcessor(pojo, getCamelContext()); 122 answer.setMethodObject(method); 123 return answer; 124 } 125 126 protected Endpoint getEndpointInjection(String uri, String name, String injectionPointName, boolean mandatory) { 127 return CamelContextHelper.getEndpointInjection(getCamelContext(), uri, name, injectionPointName, mandatory); 128 } 129 130 /** 131 * Creates the object to be injected for an {@link org.apache.camel.EndpointInject} or {@link org.apache.camel.Produce} injection point 132 */ 133 public Object getInjectionValue(Class<?> type, String endpointUri, String endpointRef, String injectionPointName) { 134 if (type.isAssignableFrom(ProducerTemplate.class)) { 135 return createInjectionProducerTemplate(endpointUri, endpointRef, injectionPointName); 136 } else { 137 Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, true); 138 if (endpoint != null) { 139 if (type.isInstance(endpoint)) { 140 return endpoint; 141 } else if (type.isAssignableFrom(Producer.class)) { 142 return createInjectionProducer(endpoint); 143 } else if (type.isAssignableFrom(PollingConsumer.class)) { 144 return createInjectionPollingConsumer(endpoint); 145 } else if (type.isInterface()) { 146 // lets create a proxy 147 try { 148 return ProxyHelper.createProxy(endpoint, type); 149 } catch (Exception e) { 150 throw createProxyInstantiationRuntimeException(type, endpoint, e); 151 } 152 } else { 153 throw new IllegalArgumentException("Invalid type: " + type.getName() 154 + " which cannot be injected via @EndpointInject/@Produce for: " + endpoint); 155 } 156 } 157 return null; 158 } 159 } 160 161 /** 162 * Factory method to create a {@link org.apache.camel.ProducerTemplate} to be injected into a POJO 163 */ 164 protected ProducerTemplate createInjectionProducerTemplate(String endpointUri, String endpointRef, String injectionPointName) { 165 // endpoint is optional for this injection point 166 Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, false); 167 return new DefaultProducerTemplate(getCamelContext(), endpoint); 168 } 169 170 /** 171 * Factory method to create a started {@link org.apache.camel.PollingConsumer} to be injected into a POJO 172 */ 173 protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint) { 174 try { 175 PollingConsumer pollingConsumer = endpoint.createPollingConsumer(); 176 startService(pollingConsumer); 177 return pollingConsumer; 178 } catch (Exception e) { 179 throw ObjectHelper.wrapRuntimeCamelException(e); 180 } 181 } 182 183 /** 184 * A Factory method to create a started {@link org.apache.camel.Producer} to be injected into a POJO 185 */ 186 protected Producer createInjectionProducer(Endpoint endpoint) { 187 try { 188 Producer producer = endpoint.createProducer(); 189 startService(producer); 190 return producer; 191 } catch (Exception e) { 192 throw ObjectHelper.wrapRuntimeCamelException(e); 193 } 194 } 195 196 protected RuntimeException createProxyInstantiationRuntimeException(Class<?> type, Endpoint endpoint, Exception e) { 197 return new ProxyInstantiationException(type, endpoint, e); 198 } 199 }