001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.concurrent.ExecutorService;
020    import java.util.concurrent.ScheduledExecutorService;
021    import java.util.concurrent.ScheduledFuture;
022    import java.util.concurrent.TimeUnit;
023    
024    import org.apache.camel.Endpoint;
025    import org.apache.camel.Processor;
026    import org.apache.camel.util.ObjectHelper;
027    import org.apache.camel.util.concurrent.ExecutorServiceHelper;
028    import org.apache.commons.logging.Log;
029    import org.apache.commons.logging.LogFactory;
030    
031    /**
032     * A useful base class for any consumer which is polling based
033     * 
034     * @version $Revision: 772598 $
035     */
036    public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable {
037        private static final transient Log LOG = LogFactory.getLog(ScheduledPollConsumer.class);
038    
039        private final ScheduledExecutorService executor;
040        private ScheduledFuture<?> future;
041        private Exception firstExceptionThrown;
042    
043        // if adding more options then align with ScheduledPollEndpoint#configureScheduledPollConsumerProperties
044        private long initialDelay = 1000;
045        private long delay = 500;
046        private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
047        private boolean useFixedDelay;
048    
049        public ScheduledPollConsumer(DefaultEndpoint endpoint, Processor processor) {
050            super(endpoint, processor);
051    
052            ScheduledExecutorService scheduled;
053            ExecutorService service = endpoint.getExecutorService();
054            if (service instanceof ScheduledExecutorService) {
055                scheduled = (ScheduledExecutorService) service;
056            } else {
057                scheduled = ExecutorServiceHelper.newScheduledThreadPool(5, getEndpoint().getEndpointUri(), true);
058            }
059    
060            this.executor = scheduled;
061            ObjectHelper.notNull(executor, "executor");
062        }
063    
064        public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService executor) {
065            super(endpoint, processor);
066            this.executor = executor;
067            ObjectHelper.notNull(executor, "executor");
068        }
069    
070        /**
071         * Invoked whenever we should be polled
072         */
073        public void run() {
074            try {
075                if (isRunAllowed()) {
076                    if (LOG.isTraceEnabled()) {
077                        LOG.trace("Starting to poll: " + this.getEndpoint());
078                    }
079                    poll();
080                }
081            } catch (Exception e) {
082                LOG.warn("An exception occurred while polling: " + this.getEndpoint() + ": " + e.getMessage(), e);
083                if (firstExceptionThrown == null) {
084                    firstExceptionThrown = e;
085                }
086            }
087    
088            if (LOG.isTraceEnabled()) {
089                LOG.trace("Finished polling: " + this.getEndpoint());
090            }
091        }
092    
093        // Properties
094        // -------------------------------------------------------------------------
095        public long getInitialDelay() {
096            return initialDelay;
097        }
098    
099        public void setInitialDelay(long initialDelay) {
100            this.initialDelay = initialDelay;
101        }
102    
103        public long getDelay() {
104            return delay;
105        }
106    
107        public void setDelay(long delay) {
108            this.delay = delay;
109        }
110    
111        public TimeUnit getTimeUnit() {
112            return timeUnit;
113        }
114    
115        public void setTimeUnit(TimeUnit timeUnit) {
116            this.timeUnit = timeUnit;
117        }
118    
119        public boolean isUseFixedDelay() {
120            return useFixedDelay;
121        }
122    
123        public void setUseFixedDelay(boolean useFixedDelay) {
124            this.useFixedDelay = useFixedDelay;
125        }
126    
127        // Implementation methods
128        // -------------------------------------------------------------------------
129    
130        /**
131         * The polling method which is invoked periodically to poll this consumer
132         * 
133         * @throws Exception can be thrown if an exception occurred during polling
134         */
135        protected abstract void poll() throws Exception;
136    
137        @Override
138        protected void doStart() throws Exception {
139            firstExceptionThrown = null;
140            super.doStart();
141            if (isUseFixedDelay()) {
142                future = executor.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit());
143            } else {
144                future = executor.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit());
145            }
146        }
147    
148        @Override
149        protected void doStop() throws Exception {
150            if (future != null) {
151                future.cancel(false);
152            }
153            super.doStop();
154    
155            if (firstExceptionThrown != null) {
156                throw firstExceptionThrown;
157            }
158        }
159    }