001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.component.rmi;
019    
020    import java.lang.reflect.InvocationHandler;
021    import java.lang.reflect.InvocationTargetException;
022    import java.lang.reflect.Method;
023    import java.lang.reflect.Proxy;
024    import java.rmi.Remote;
025    import java.rmi.registry.Registry;
026    import java.rmi.server.UnicastRemoteObject;
027    
028    import org.apache.camel.Consumer;
029    import org.apache.camel.Processor;
030    import org.apache.camel.component.pojo.PojoExchange;
031    import org.apache.camel.component.pojo.PojoInvocation;
032    import org.apache.camel.impl.DefaultConsumer;
033    
034    /**
035     * A {@link Consumer} which uses RMI's {@see UnicastRemoteObject} to consume method invocations.
036     *
037     * @version $Revision: 533758 $
038     */
039    public class RmiConsumer extends DefaultConsumer<PojoExchange> implements InvocationHandler {
040    
041            private final RmiEndpoint endpoint;
042            private Remote stub;
043            private Remote proxy;
044    
045            public RmiConsumer(RmiEndpoint endpoint, Processor processor) {
046                    super(endpoint, processor);
047                    this.endpoint = endpoint;
048                    
049            }
050    
051            @Override
052            protected void doStart() throws Exception {
053                    Class[] interfaces = new Class[endpoint.getRemoteInterfaces().size()];
054                    endpoint.getRemoteInterfaces().toArray(interfaces);
055                    proxy = (Remote) Proxy.newProxyInstance(endpoint.getClassLoader(), interfaces, this);
056                    stub = UnicastRemoteObject.exportObject(proxy,endpoint.getPort());
057                    
058            try { 
059                    Registry registry = endpoint.getRegistry();
060                    String name = endpoint.getName();               
061                            registry.bind(name, stub);
062                            
063                    } catch (Exception e) { // Registration might fail.. clean up..
064                            try { 
065                                    UnicastRemoteObject.unexportObject(stub, true);
066                            } catch (Throwable e1) {
067                            }
068                            stub=null;
069                            throw e;
070                    }
071            super.doStart();
072            }
073            
074            @Override
075            protected void doStop() throws Exception {
076                    super.doStop();
077                    try {
078                    Registry registry = endpoint.getRegistry();
079                    registry.unbind(endpoint.getName());
080                    } catch( Throwable e ) { // do our best to unregister
081                    }
082                    UnicastRemoteObject.unexportObject(proxy, true);                
083            }
084            
085            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
086            if (!isStarted()) {
087                throw new IllegalStateException("The endpoint is not active: " + getEndpoint().getEndpointUri());
088            }
089            PojoInvocation invocation = new PojoInvocation(proxy, method, args);
090            PojoExchange exchange = getEndpoint().createExchange();
091            exchange.setInvocation(invocation);
092            getProcessor().process(exchange);
093            Throwable fault = exchange.getException();
094            if (fault != null) {
095                throw new InvocationTargetException(fault);
096            }
097            return exchange.getOut().getBody();
098            }
099    
100            public Remote getProxy() {
101                    return proxy;
102            }
103    
104            public Remote getStub() {
105                    return stub;
106            }
107    }