001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.concurrent.ExecutorService;
020    import java.util.concurrent.ScheduledExecutorService;
021    import java.util.concurrent.ScheduledFuture;
022    import java.util.concurrent.TimeUnit;
023    
024    import org.apache.camel.Endpoint;
025    import org.apache.camel.PollingConsumerPollStrategy;
026    import org.apache.camel.Processor;
027    import org.apache.camel.util.ObjectHelper;
028    import org.apache.camel.util.concurrent.ExecutorServiceHelper;
029    import org.apache.commons.logging.Log;
030    import org.apache.commons.logging.LogFactory;
031    
032    /**
033     * A useful base class for any consumer which is polling based
034     * 
035     * @version $Revision: 789216 $
036     */
037    public abstract class ScheduledPollConsumer extends DefaultConsumer implements Runnable {
038        private static final transient Log LOG = LogFactory.getLog(ScheduledPollConsumer.class);
039    
040        private final ScheduledExecutorService executor;
041        private ScheduledFuture<?> future;
042    
043        // if adding more options then align with ScheduledPollEndpoint#configureScheduledPollConsumerProperties
044        private long initialDelay = 1000;
045        private long delay = 500;
046        private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
047        private boolean useFixedDelay;
048        private PollingConsumerPollStrategy pollStrategy = new DefaultPollingConsumerPollStrategy();
049    
050        public ScheduledPollConsumer(DefaultEndpoint endpoint, Processor processor) {
051            super(endpoint, processor);
052    
053            ScheduledExecutorService scheduled;
054            ExecutorService service = endpoint.getExecutorService();
055            if (service instanceof ScheduledExecutorService) {
056                scheduled = (ScheduledExecutorService) service;
057            } else {
058                scheduled = ExecutorServiceHelper.newScheduledThreadPool(5, getEndpoint().getEndpointUri(), true);
059            }
060    
061            this.executor = scheduled;
062            ObjectHelper.notNull(executor, "executor");
063        }
064    
065        public ScheduledPollConsumer(Endpoint endpoint, Processor processor, ScheduledExecutorService executor) {
066            super(endpoint, processor);
067            this.executor = executor;
068            ObjectHelper.notNull(executor, "executor");
069        }
070    
071        /**
072         * Invoked whenever we should be polled
073         */
074        public void run() {
075            int retryCounter = -1;
076            boolean done = false;
077    
078            while (!done) {
079                try {
080                    // eager assume we are done
081                    done = true;
082                    if (isRunAllowed()) {
083    
084                        if (retryCounter == -1) {
085                            if (LOG.isTraceEnabled()) {
086                                LOG.trace("Starting to poll: " + this.getEndpoint());
087                            }
088                        } else {
089                            if (LOG.isDebugEnabled()) {
090                                if (LOG.isDebugEnabled()) {
091                                    LOG.debug("Retrying attempt " + retryCounter + " to poll: " + this.getEndpoint());
092                                }
093                            }
094                        }
095    
096                        pollStrategy.begin(this, getEndpoint());
097                        retryCounter++;
098                        poll();
099                        pollStrategy.commit(this, getEndpoint());
100                    }
101                } catch (Exception e) {
102                    try {
103                        boolean retry = pollStrategy.rollback(this, getEndpoint(), retryCounter, e);
104                        if (retry) {
105                            done = false;
106                        }
107                    } catch (Exception re) {
108                        throw ObjectHelper.wrapRuntimeCamelException(re);
109                    }
110                }
111            }
112    
113            if (LOG.isTraceEnabled()) {
114                LOG.trace("Finished polling: " + this.getEndpoint());
115            }
116        }
117    
118        // Properties
119        // -------------------------------------------------------------------------
120        public long getInitialDelay() {
121            return initialDelay;
122        }
123    
124        public void setInitialDelay(long initialDelay) {
125            this.initialDelay = initialDelay;
126        }
127    
128        public long getDelay() {
129            return delay;
130        }
131    
132        public void setDelay(long delay) {
133            this.delay = delay;
134        }
135    
136        public TimeUnit getTimeUnit() {
137            return timeUnit;
138        }
139    
140        public void setTimeUnit(TimeUnit timeUnit) {
141            this.timeUnit = timeUnit;
142        }
143    
144        public boolean isUseFixedDelay() {
145            return useFixedDelay;
146        }
147    
148        public void setUseFixedDelay(boolean useFixedDelay) {
149            this.useFixedDelay = useFixedDelay;
150        }
151    
152        public PollingConsumerPollStrategy getPollStrategy() {
153            return pollStrategy;
154        }
155    
156        public void setPollStrategy(PollingConsumerPollStrategy pollStrategy) {
157            this.pollStrategy = pollStrategy;
158        }
159    
160        // Implementation methods
161        // -------------------------------------------------------------------------
162    
163        /**
164         * The polling method which is invoked periodically to poll this consumer
165         * 
166         * @throws Exception can be thrown if an exception occurred during polling
167         */
168        protected abstract void poll() throws Exception;
169    
170        @Override
171        protected void doStart() throws Exception {
172            super.doStart();
173            if (isUseFixedDelay()) {
174                future = executor.scheduleWithFixedDelay(this, getInitialDelay(), getDelay(), getTimeUnit());
175            } else {
176                future = executor.scheduleAtFixedRate(this, getInitialDelay(), getDelay(), getTimeUnit());
177            }
178        }
179    
180        @Override
181        protected void doStop() throws Exception {
182            if (future != null) {
183                future.cancel(false);
184            }
185            super.doStop();
186        }
187    }