001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.component.pojo.timer;
019    
020    import java.lang.reflect.InvocationHandler;
021    import java.lang.reflect.InvocationTargetException;
022    import java.lang.reflect.Method;
023    import java.lang.reflect.Proxy;
024    import java.util.Timer;
025    import java.util.TimerTask;
026    
027    import org.apache.camel.Processor;
028    import org.apache.camel.component.pojo.PojoExchange;
029    import org.apache.camel.component.pojo.PojoInvocation;
030    import org.apache.camel.impl.DefaultConsumer;
031    
032    /**
033     * @version $Revision: 523047 $
034     */
035    public class TimerConsumer extends DefaultConsumer<PojoExchange> implements InvocationHandler {
036    
037        private final TimerEndpoint endpoint;
038        private Timer timer;
039        
040    
041            public TimerConsumer(TimerEndpoint endpoint, Processor processor) {
042            super(endpoint, processor);
043                    this.endpoint = endpoint;
044        }
045        
046        @Override
047        protected void doStart() throws Exception {
048            TimerComponent component = endpoint.getComponent();
049            component.addConsumer(this);            
050            timer=createTimerAndTask();
051        }
052    
053        @Override
054        protected void doStop() throws Exception {
055            if(timer!=null){
056                timer.cancel();
057            }
058            TimerComponent component = endpoint.getComponent();
059            component.removeConsumer(this);         
060        }
061        
062        private Timer createTimerAndTask(){
063            
064            final Runnable proxy = createProxy();
065            TimerTask task=new TimerTask(){
066                @Override public void run(){
067                    proxy.run();
068                }
069            };
070            
071            Timer result=new Timer(endpoint.getTimerName(),endpoint.isDaemon());
072            if(endpoint.isFixedRate()){
073                if(endpoint.getTime()!=null){
074                    result.scheduleAtFixedRate(task,endpoint.getTime(),endpoint.getPeriod());
075                }else{
076                    result.scheduleAtFixedRate(task,endpoint.getDelay(),endpoint.getPeriod());
077                }
078            }else{
079                if(endpoint.getTime()!=null){
080                    if(endpoint.getPeriod()>=0){
081                        result.schedule(task,endpoint.getTime(),endpoint.getPeriod());
082                    }else{
083                        result.schedule(task,endpoint.getTime());
084                    }
085                }else{
086                    if(endpoint.getPeriod()>=0){
087                        result.schedule(task,endpoint.getDelay(),endpoint.getPeriod());
088                    }else{
089                        result.schedule(task,endpoint.getDelay());
090                    }
091                }
092            }
093            return result;
094        }
095        
096        /**
097         * Creates a Proxy which generates the inbound PojoExchanges
098         */
099        public Runnable createProxy() {
100            return (Runnable) Proxy.newProxyInstance(Runnable.class.getClassLoader(), new Class[]{Runnable.class}, this);
101        }
102        
103            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
104            if (!isStarted()) {
105                throw new IllegalStateException("The endpoint is not active: " + getEndpoint().getEndpointUri());
106            }
107            PojoInvocation invocation = new PojoInvocation(proxy, method, args);
108            PojoExchange exchange = getEndpoint().createExchange();
109            exchange.setInvocation(invocation);
110            getProcessor().process(exchange);
111            Throwable fault = exchange.getException();
112            if (fault != null) {
113                throw new InvocationTargetException(fault);
114            }
115            return exchange.getOut().getBody();
116            }
117      
118    
119    
120    }