001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.timer;
018    
019    import java.lang.reflect.InvocationHandler;
020    import java.lang.reflect.InvocationTargetException;
021    import java.lang.reflect.Method;
022    import java.lang.reflect.Proxy;
023    import java.util.Timer;
024    import java.util.TimerTask;
025    
026    import org.apache.camel.Processor;
027    import org.apache.camel.component.bean.BeanExchange;
028    import org.apache.camel.component.bean.BeanInvocation;
029    import org.apache.camel.impl.DefaultConsumer;
030    
031    /**
032     * @version $Revision: 523047 $
033     */
034    public class TimerConsumer extends DefaultConsumer<BeanExchange> implements InvocationHandler {
035    
036        private final TimerEndpoint endpoint;
037        private Timer timer;
038    
039        public TimerConsumer(TimerEndpoint endpoint, Processor processor) {
040            super(endpoint, processor);
041            this.endpoint = endpoint;
042        }
043    
044        @Override
045        protected void doStart() throws Exception {
046            TimerComponent component = endpoint.getComponent();
047            component.addConsumer(this);
048            timer = createTimerAndTask();
049        }
050    
051        @Override
052        protected void doStop() throws Exception {
053            if (timer != null) {
054                timer.cancel();
055            }
056            TimerComponent component = endpoint.getComponent();
057            component.removeConsumer(this);
058        }
059    
060        private Timer createTimerAndTask() {
061    
062            final Runnable proxy = createProxy();
063            TimerTask task = new TimerTask() {
064                @Override
065                public void run() {
066                    proxy.run();
067                }
068            };
069    
070            Timer result = new Timer(endpoint.getTimerName(), endpoint.isDaemon());
071            if (endpoint.isFixedRate()) {
072                if (endpoint.getTime() != null) {
073                    result.scheduleAtFixedRate(task, endpoint.getTime(), endpoint.getPeriod());
074                } else {
075                    result.scheduleAtFixedRate(task, endpoint.getDelay(), endpoint.getPeriod());
076                }
077            } else {
078                if (endpoint.getTime() != null) {
079                    if (endpoint.getPeriod() >= 0) {
080                        result.schedule(task, endpoint.getTime(), endpoint.getPeriod());
081                    } else {
082                        result.schedule(task, endpoint.getTime());
083                    }
084                } else {
085                    if (endpoint.getPeriod() >= 0) {
086                        result.schedule(task, endpoint.getDelay(), endpoint.getPeriod());
087                    } else {
088                        result.schedule(task, endpoint.getDelay());
089                    }
090                }
091            }
092            return result;
093        }
094    
095        /**
096         * Creates a Proxy which generates the inbound PojoExchanges
097         */
098        public Runnable createProxy() {
099            return (Runnable)Proxy.newProxyInstance(Runnable.class.getClassLoader(),
100                                                    new Class[] {Runnable.class}, this);
101        }
102    
103        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
104            if (!isStarted()) {
105                throw new IllegalStateException("The endpoint is not active: " + getEndpoint().getEndpointUri());
106            }
107            BeanInvocation invocation = new BeanInvocation(proxy, method, args);
108            BeanExchange exchange = getEndpoint().createExchange();
109            exchange.setInvocation(invocation);
110            getProcessor().process(exchange);
111            Throwable fault = exchange.getException();
112            if (fault != null) {
113                throw new InvocationTargetException(fault);
114            }
115            return exchange.getOut().getBody();
116        }
117    
118    }