001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.impl;
019    
020    import org.apache.camel.Endpoint;
021    import org.apache.camel.Exchange;
022    import org.apache.camel.Processor;
023    import org.apache.commons.logging.Log;
024    import org.apache.commons.logging.LogFactory;
025    
026    import java.util.concurrent.ScheduledExecutorService;
027    import java.util.concurrent.ScheduledFuture;
028    import java.util.concurrent.TimeUnit;
029    
030    /**
031     * A useful base class for any consumer which is polling based
032     *
033     * @version $Revision: 541335 $
034     */
035    public abstract class ScheduledPollConsumer<E extends Exchange> extends DefaultConsumer<E> implements Runnable {
036        private static final transient Log log = LogFactory.getLog(ScheduledPollConsumer.class);
037        
038        private final ScheduledExecutorService executor;
039        private long initialDelay = 1000;
040        private long delay = 500;
041        private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
042        private boolean useFixedDelay;
043        private ScheduledFuture<?> future;
044    
045        public ScheduledPollConsumer(DefaultEndpoint<E> endpoint, Processor processor) {
046            this(endpoint, processor, endpoint.getExecutorService());
047        }
048    
049        public ScheduledPollConsumer(Endpoint<E> endpoint, Processor processor, ScheduledExecutorService executor) {
050            super(endpoint, processor);
051            this.executor = executor;
052            if (executor == null) {
053                throw new IllegalArgumentException("A non null ScheduledExecutorService must be provided.");
054            }
055        }
056    
057        /**
058         * Invoked whenever we should be polled
059         */
060        public void run() {
061            log.debug("Starting to poll");
062            try {
063                poll();
064            }
065            catch (Exception e) {
066                log.warn("Caught: " + e, e);
067            }
068        }
069    
070        // Properties
071        //-------------------------------------------------------------------------
072        public long getInitialDelay() {
073            return initialDelay;
074        }
075    
076        public void setInitialDelay(long initialDelay) {
077            this.initialDelay = initialDelay;
078        }
079    
080        public long getDelay() {
081            return delay;
082        }
083    
084        public void setDelay(long delay) {
085            this.delay = delay;
086        }
087    
088        public TimeUnit getTimeUnit() {
089            return timeUnit;
090        }
091    
092        public void setTimeUnit(TimeUnit timeUnit) {
093            this.timeUnit = timeUnit;
094        }
095    
096        public boolean isUseFixedDelay() {
097            return useFixedDelay;
098        }
099    
100        public void setUseFixedDelay(boolean useFixedDelay) {
101            this.useFixedDelay = useFixedDelay;
102        }
103    
104        // Implementation methods
105        //-------------------------------------------------------------------------
106    
107        /**
108         * The polling method which is invoked periodically to poll this consumer
109         *
110         * @throws Exception
111         */
112        protected abstract void poll() throws Exception;
113    
114        @Override
115        protected void doStart() throws Exception {
116            super.doStart();
117            if (isUseFixedDelay()) {
118                future = executor.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit());
119            }
120            else {
121                future = executor.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit());
122            }
123        }
124    
125        @Override
126        protected void doStop() throws Exception {
127            if (future != null) {
128                future.cancel(false);
129            }
130            super.doStop();
131        }
132    }