001    /**
002     *
003     * Licensed to the Apache Software Foundation (ASF) under one or more
004     * contributor license agreements.  See the NOTICE file distributed with
005     * this work for additional information regarding copyright ownership.
006     * The ASF licenses this file to You under the Apache License, Version 2.0
007     * (the "License"); you may not use this file except in compliance with
008     * the License.  You may obtain a copy of the License at
009     *
010     * http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.camel.bam.processor;
019    
020    import org.apache.camel.bam.model.ActivityState;
021    import org.apache.camel.bam.rules.ProcessRules;
022    import org.apache.camel.impl.ServiceSupport;
023    import org.apache.commons.logging.Log;
024    import org.apache.commons.logging.LogFactory;
025    import org.springframework.orm.jpa.JpaCallback;
026    import org.springframework.orm.jpa.JpaTemplate;
027    import org.springframework.transaction.TransactionStatus;
028    import org.springframework.transaction.support.TransactionCallbackWithoutResult;
029    import org.springframework.transaction.support.TransactionTemplate;
030    
031    import javax.persistence.EntityManager;
032    import javax.persistence.LockModeType;
033    import javax.persistence.PersistenceException;
034    import java.util.Date;
035    import java.util.List;
036    
037    /**
038     * A timer engine to monitor for expired activities and perform whatever actions are required.
039     *
040     * @version $Revision: $
041     */
042    public class ActivityMonitorEngine extends ServiceSupport implements Runnable {
043        private static final Log log = LogFactory.getLog(ActivityMonitorEngine.class);
044        private JpaTemplate template;
045        private TransactionTemplate transactionTemplate;
046        private ProcessRules rules;
047        private int escalateLevel = 0;
048        private long windowMillis = 1000L;
049        private Thread thread;
050        private boolean useLocking = false;
051    
052        public ActivityMonitorEngine(JpaTemplate template, TransactionTemplate transactionTemplate, ProcessRules rules) {
053            this.template = template;
054            this.transactionTemplate = transactionTemplate;
055            this.rules = rules;
056        }
057    
058        public boolean isUseLocking() {
059            return useLocking;
060        }
061    
062        public void setUseLocking(boolean useLocking) {
063            this.useLocking = useLocking;
064        }
065    
066        public void run() {
067            log.debug("Starting to poll for timeout events");
068    
069            while (!isStopped()) {
070                try {
071                    long now = System.currentTimeMillis();
072                    long nextPoll = now + windowMillis;
073                    final Date timeNow = new Date(now);
074    
075                    transactionTemplate.execute(new TransactionCallbackWithoutResult() {
076                        protected void doInTransactionWithoutResult(TransactionStatus status) {
077                            List<ActivityState> list = template.find("select x from " + ActivityState.class.getName() + " x where x.escalationLevel = ?1 and x.timeOverdue < ?2", escalateLevel, timeNow);
078                            for (ActivityState activityState : list) {
079                                fireExpiredEvent(activityState);
080                            }
081                        }
082                    });
083    
084                    long timeToSleep = nextPoll - System.currentTimeMillis();
085                    if (timeToSleep > 0) {
086                        if (log.isDebugEnabled()) {
087                            log.debug("Sleeping for " + timeToSleep + " millis");
088                        }
089                        try {
090                            Thread.sleep(timeToSleep);
091                        }
092                        catch (InterruptedException e) {
093                            log.debug("Caught: " + e, e);
094                        }
095                    }
096                }
097                catch (Exception e) {
098                    log.error("Caught: " + e, e);
099                }
100            }
101        }
102    
103        protected void fireExpiredEvent(final ActivityState activityState) {
104            if (log.isDebugEnabled()) {
105                log.debug("Trying to fire expiration of: " + activityState);
106            }
107    
108            template.execute(new JpaCallback() {
109                public Object doInJpa(EntityManager entityManager) throws PersistenceException {
110                    // lets try lock the object first
111                    if (isUseLocking()) {
112                        log.info("Attempting to lock: " + activityState);
113                        entityManager.lock(activityState, LockModeType.WRITE);
114                        log.info("Grabbed lock: " + activityState);
115                    }
116    
117                    try {
118                        rules.processExpired(activityState);
119                    }
120                    catch (Exception e) {
121                        log.error("Failed to process expiration of: " + activityState + ". Reason: " + e, e);
122                    }
123                    activityState.setEscalationLevel(escalateLevel + 1);
124                    return null;
125                }
126            });
127        }
128    
129        protected void doStart() throws Exception {
130            rules.start();
131            thread = new Thread(this, "ActivityMonitorEngine");
132            thread.start();
133        }
134    
135        protected void doStop() throws Exception {
136            if (thread != null) {
137                thread = null;
138            }
139            rules.stop();
140        }
141    }