001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.component.pojo; 019 020 import java.lang.reflect.InvocationHandler; 021 import java.lang.reflect.InvocationTargetException; 022 import java.lang.reflect.Method; 023 import java.lang.reflect.Proxy; 024 025 import org.apache.camel.Processor; 026 import org.apache.camel.RuntimeCamelException; 027 import org.apache.camel.impl.DefaultConsumer; 028 029 /** 030 * @version $Revision: 525547 $ 031 */ 032 public class PojoConsumer extends DefaultConsumer<PojoExchange> implements InvocationHandler { 033 034 private final PojoEndpoint endpoint; 035 036 public PojoConsumer(PojoEndpoint endpoint, Processor<PojoExchange> processor) { 037 super(endpoint, processor); 038 this.endpoint = endpoint; 039 } 040 041 @Override 042 protected void doStart() throws Exception { 043 PojoComponent component = endpoint.getComponent(); 044 PojoConsumer consumer = component.getConsumer(endpoint.getPojoId()); 045 if( consumer != null ) { 046 throw new RuntimeCamelException("There is a consumer already registered for endpoint: "+endpoint.getEndpointUri()); 047 } 048 component.addConsumer(endpoint.getPojoId(), this); 049 } 050 051 @Override 052 protected void doStop() throws Exception { 053 PojoComponent component = endpoint.getComponent(); 054 component.removeConsumer(endpoint.getPojoId()); 055 } 056 057 /** 058 * Creates a Proxy which generates inbound exchanges on the consumer. 059 */ 060 public Object createProxy(ClassLoader cl, Class interfaces[]) { 061 return Proxy.newProxyInstance(cl, interfaces, this); 062 } 063 /** 064 * Creates a Proxy which generates inbound exchanges on the consumer. 065 */ 066 public Object createProxy(Class interfaces[]) { 067 if( interfaces.length < 1 ) { 068 throw new IllegalArgumentException("You must provide at least 1 interface class."); 069 } 070 return createProxy(interfaces[0].getClassLoader(), interfaces); 071 } 072 /** 073 * Creates a Proxy which generates inbound exchanges on the consumer. 074 */ 075 @SuppressWarnings("unchecked") 076 public <T> T createProxy(ClassLoader cl, Class<T> interfaceClass) { 077 return (T) createProxy(cl, new Class[]{interfaceClass}); 078 } 079 /** 080 * Creates a Proxy which generates inbound exchanges on the consumer. 081 */ 082 @SuppressWarnings("unchecked") 083 public <T> T createProxy(Class<T> interfaceClass) { 084 return (T) createProxy(new Class[]{interfaceClass}); 085 } 086 087 088 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 089 if (!isStarted()) { 090 throw new IllegalStateException("The endpoint is not active: " + getEndpoint().getEndpointUri()); 091 } 092 PojoInvocation invocation = new PojoInvocation(proxy, method, args); 093 PojoExchange exchange = getEndpoint().createExchange(); 094 exchange.setInvocation(invocation); 095 getProcessor().process(exchange); 096 Throwable fault = exchange.getException(); 097 if (fault != null) { 098 throw new InvocationTargetException(fault); 099 } 100 return exchange.getOut().getBody(); 101 } 102 }