001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.rmi;
018    
019    import java.lang.reflect.InvocationHandler;
020    import java.lang.reflect.InvocationTargetException;
021    import java.lang.reflect.Method;
022    import java.lang.reflect.Proxy;
023    import java.rmi.Remote;
024    import java.rmi.registry.Registry;
025    import java.rmi.server.UnicastRemoteObject;
026    
027    import org.apache.camel.Processor;
028    import org.apache.camel.component.bean.BeanExchange;
029    import org.apache.camel.component.bean.BeanInvocation;
030    import org.apache.camel.impl.DefaultConsumer;
031    
032    /**
033     * A {@link Consumer} which uses RMI's {@see UnicastRemoteObject} to consume
034     * method invocations.
035     * 
036     * @version $Revision: 533758 $
037     */
038    public class RmiConsumer extends DefaultConsumer<BeanExchange> implements InvocationHandler {
039    
040        private final RmiEndpoint endpoint;
041        private Remote stub;
042        private Remote proxy;
043    
044        public RmiConsumer(RmiEndpoint endpoint, Processor processor) {
045            super(endpoint, processor);
046            this.endpoint = endpoint;
047    
048        }
049    
050        @Override
051        protected void doStart() throws Exception {
052            Class[] interfaces = new Class[endpoint.getRemoteInterfaces().size()];
053            endpoint.getRemoteInterfaces().toArray(interfaces);
054            proxy = (Remote)Proxy.newProxyInstance(endpoint.getClassLoader(), interfaces, this);
055            stub = UnicastRemoteObject.exportObject(proxy, endpoint.getPort());
056    
057            try {
058                Registry registry = endpoint.getRegistry();
059                String name = endpoint.getName();
060                registry.bind(name, stub);
061    
062            } catch (Exception e) { // Registration might fail.. clean up..
063                try {
064                    UnicastRemoteObject.unexportObject(stub, true);
065                } catch (Throwable ignore) {
066                }
067                stub = null;
068                throw e;
069            }
070            super.doStart();
071        }
072    
073        @Override
074        protected void doStop() throws Exception {
075            super.doStop();
076            try {
077                Registry registry = endpoint.getRegistry();
078                registry.unbind(endpoint.getName());
079            } catch (Throwable e) { // do our best to unregister
080            }
081            UnicastRemoteObject.unexportObject(proxy, true);
082        }
083    
084        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
085            if (!isStarted()) {
086                throw new IllegalStateException("The endpoint is not active: " + getEndpoint().getEndpointUri());
087            }
088            BeanInvocation invocation = new BeanInvocation(proxy, method, args);
089            BeanExchange exchange = getEndpoint().createExchange();
090            exchange.setInvocation(invocation);
091            getProcessor().process(exchange);
092            Throwable fault = exchange.getException();
093            if (fault != null) {
094                throw new InvocationTargetException(fault);
095            }
096            return exchange.getOut().getBody();
097        }
098    
099        public Remote getProxy() {
100            return proxy;
101        }
102    
103        public Remote getStub() {
104            return stub;
105        }
106    }