001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.component.pojo.timer; 019 020 import java.lang.reflect.InvocationHandler; 021 import java.lang.reflect.InvocationTargetException; 022 import java.lang.reflect.Method; 023 import java.lang.reflect.Proxy; 024 import java.util.Timer; 025 import java.util.TimerTask; 026 027 import org.apache.camel.Processor; 028 import org.apache.camel.component.pojo.PojoExchange; 029 import org.apache.camel.component.pojo.PojoInvocation; 030 import org.apache.camel.impl.DefaultConsumer; 031 032 /** 033 * @version $Revision: 523047 $ 034 */ 035 public class TimerConsumer extends DefaultConsumer<PojoExchange> implements InvocationHandler { 036 037 private final TimerEndpoint endpoint; 038 private Timer timer; 039 040 041 public TimerConsumer(TimerEndpoint endpoint, Processor processor) { 042 super(endpoint, processor); 043 this.endpoint = endpoint; 044 } 045 046 @Override 047 protected void doStart() throws Exception { 048 TimerComponent component = endpoint.getComponent(); 049 component.addConsumer(this); 050 timer=createTimerAndTask(); 051 } 052 053 @Override 054 protected void doStop() throws Exception { 055 if(timer!=null){ 056 timer.cancel(); 057 } 058 TimerComponent component = endpoint.getComponent(); 059 component.removeConsumer(this); 060 } 061 062 private Timer createTimerAndTask(){ 063 064 final Runnable proxy = createProxy(); 065 TimerTask task=new TimerTask(){ 066 @Override public void run(){ 067 proxy.run(); 068 } 069 }; 070 071 Timer result=new Timer(endpoint.getTimerName(),endpoint.isDaemon()); 072 if(endpoint.isFixedRate()){ 073 if(endpoint.getTime()!=null){ 074 result.scheduleAtFixedRate(task,endpoint.getTime(),endpoint.getPeriod()); 075 }else{ 076 result.scheduleAtFixedRate(task,endpoint.getDelay(),endpoint.getPeriod()); 077 } 078 }else{ 079 if(endpoint.getTime()!=null){ 080 if(endpoint.getPeriod()>=0){ 081 result.schedule(task,endpoint.getTime(),endpoint.getPeriod()); 082 }else{ 083 result.schedule(task,endpoint.getTime()); 084 } 085 }else{ 086 if(endpoint.getPeriod()>=0){ 087 result.schedule(task,endpoint.getDelay(),endpoint.getPeriod()); 088 }else{ 089 result.schedule(task,endpoint.getDelay()); 090 } 091 } 092 } 093 return result; 094 } 095 096 /** 097 * Creates a Proxy which generates the inbound PojoExchanges 098 */ 099 public Runnable createProxy() { 100 return (Runnable) Proxy.newProxyInstance(Runnable.class.getClassLoader(), new Class[]{Runnable.class}, this); 101 } 102 103 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 104 if (!isStarted()) { 105 throw new IllegalStateException("The endpoint is not active: " + getEndpoint().getEndpointUri()); 106 } 107 PojoInvocation invocation = new PojoInvocation(proxy, method, args); 108 PojoExchange exchange = getEndpoint().createExchange(); 109 exchange.setInvocation(invocation); 110 getProcessor().process(exchange); 111 Throwable fault = exchange.getException(); 112 if (fault != null) { 113 throw new InvocationTargetException(fault); 114 } 115 return exchange.getOut().getBody(); 116 } 117 118 119 120 }