001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.concurrent.ScheduledExecutorService;
020    import java.util.concurrent.ScheduledFuture;
021    import java.util.concurrent.TimeUnit;
022    
023    import org.apache.camel.Endpoint;
024    import org.apache.camel.Processor;
025    import org.apache.camel.util.ObjectHelper;
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    
029    /**
030     * A useful base class for any consumer which is polling based
031     * 
032     * @version $Revision: 734684 $
033     */
034    public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable {
035        private static final transient Log LOG = LogFactory.getLog(ScheduledPollConsumer.class);
036    
037        private final ScheduledExecutorService executor;
038        private ScheduledFuture<?> future;
039        private Exception firstExceptionThrown;
040    
041        // if adding more options then align with ScheduledPollEndpoint#configureScheduledPollConsumerProperties
042        private long initialDelay = 1000;
043        private long delay = 500;
044        private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
045        private boolean useFixedDelay;
046    
047        public ScheduledPollConsumer(DefaultEndpoint endpoint, Processor processor) {
048            this(endpoint, processor, endpoint.getExecutorService());
049        }
050    
051        public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService executor) {
052            super(endpoint, processor);
053            this.executor = executor;
054            ObjectHelper.notNull(executor, "executor");
055        }
056    
057        /**
058         * Invoked whenever we should be polled
059         */
060        public void run() {
061            try {
062                if (isRunAllowed()) {
063                    if (LOG.isTraceEnabled()) {
064                        LOG.trace("Starting to poll: " + this.getEndpoint());
065                    }
066                    poll();
067                }
068            } catch (Exception e) {
069                LOG.warn("An exception occurred while polling: " + this.getEndpoint() + ": " + e.getMessage(), e);
070                if (firstExceptionThrown == null) {
071                    firstExceptionThrown = e;
072                }
073            }
074    
075            if (LOG.isTraceEnabled()) {
076                LOG.trace("Finished polling: " + this.getEndpoint());
077            }
078        }
079    
080        // Properties
081        // -------------------------------------------------------------------------
082        public long getInitialDelay() {
083            return initialDelay;
084        }
085    
086        public void setInitialDelay(long initialDelay) {
087            this.initialDelay = initialDelay;
088        }
089    
090        public long getDelay() {
091            return delay;
092        }
093    
094        public void setDelay(long delay) {
095            this.delay = delay;
096        }
097    
098        public TimeUnit getTimeUnit() {
099            return timeUnit;
100        }
101    
102        public void setTimeUnit(TimeUnit timeUnit) {
103            this.timeUnit = timeUnit;
104        }
105    
106        public boolean isUseFixedDelay() {
107            return useFixedDelay;
108        }
109    
110        public void setUseFixedDelay(boolean useFixedDelay) {
111            this.useFixedDelay = useFixedDelay;
112        }
113    
114        // Implementation methods
115        // -------------------------------------------------------------------------
116    
117        /**
118         * The polling method which is invoked periodically to poll this consumer
119         * 
120         * @throws Exception can be thrown if an exception occurred during polling
121         */
122        protected abstract void poll() throws Exception;
123    
124        @Override
125        protected void doStart() throws Exception {
126            firstExceptionThrown = null;
127            super.doStart();
128            if (isUseFixedDelay()) {
129                future = executor.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit());
130            } else {
131                future = executor.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit());
132            }
133        }
134    
135        @Override
136        protected void doStop() throws Exception {
137            if (future != null) {
138                future.cancel(false);
139            }
140            super.doStop();
141    
142            if (firstExceptionThrown != null) {
143                throw firstExceptionThrown;
144            }
145        }
146    }