001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.bam.processor; 018 019 import java.util.Date; 020 import java.util.List; 021 022 import javax.persistence.EntityManager; 023 import javax.persistence.LockModeType; 024 import javax.persistence.PersistenceException; 025 026 import org.apache.camel.bam.model.ActivityState; 027 import org.apache.camel.bam.rules.ProcessRules; 028 import org.apache.camel.impl.ServiceSupport; 029 import org.apache.commons.logging.Log; 030 import org.apache.commons.logging.LogFactory; 031 032 import org.springframework.orm.jpa.JpaCallback; 033 import org.springframework.orm.jpa.JpaTemplate; 034 import org.springframework.transaction.TransactionStatus; 035 import org.springframework.transaction.support.TransactionCallbackWithoutResult; 036 import org.springframework.transaction.support.TransactionTemplate; 037 038 /** 039 * A timer engine to monitor for expired activities and perform whatever actions 040 * are required. 041 * 042 * @version $Revision: $ 043 */ 044 public class ActivityMonitorEngine extends ServiceSupport implements Runnable { 045 private static final Log LOG = LogFactory.getLog(ActivityMonitorEngine.class); 046 private JpaTemplate template; 047 private TransactionTemplate transactionTemplate; 048 private ProcessRules rules; 049 private int escalateLevel; 050 private long windowMillis = 1000L; 051 private Thread thread; 052 private boolean useLocking; 053 054 public ActivityMonitorEngine(JpaTemplate template, TransactionTemplate transactionTemplate, ProcessRules rules) { 055 this.template = template; 056 this.transactionTemplate = transactionTemplate; 057 this.rules = rules; 058 } 059 060 public boolean isUseLocking() { 061 return useLocking; 062 } 063 064 public void setUseLocking(boolean useLocking) { 065 this.useLocking = useLocking; 066 } 067 068 public void run() { 069 LOG.debug("Starting to poll for timeout events"); 070 071 while (!isStopped()) { 072 try { 073 long now = System.currentTimeMillis(); 074 long nextPoll = now + windowMillis; 075 final Date timeNow = new Date(now); 076 077 transactionTemplate.execute(new TransactionCallbackWithoutResult() { 078 protected void doInTransactionWithoutResult(TransactionStatus status) { 079 List<ActivityState> list = template.find("select x from " + ActivityState.class.getName() + " x where x.escalationLevel = ?1 and x.timeOverdue < ?2", escalateLevel, timeNow); 080 for (ActivityState activityState : list) { 081 fireExpiredEvent(activityState); 082 } 083 } 084 }); 085 086 long timeToSleep = nextPoll - System.currentTimeMillis(); 087 if (timeToSleep > 0) { 088 if (LOG.isDebugEnabled()) { 089 LOG.debug("Sleeping for " + timeToSleep + " millis"); 090 } 091 try { 092 Thread.sleep(timeToSleep); 093 } catch (InterruptedException e) { 094 LOG.debug("Caught: " + e, e); 095 } 096 } 097 } catch (Exception e) { 098 LOG.error("Caught: " + e, e); 099 } 100 } 101 } 102 103 protected void fireExpiredEvent(final ActivityState activityState) { 104 if (LOG.isDebugEnabled()) { 105 LOG.debug("Trying to fire expiration of: " + activityState); 106 } 107 108 template.execute(new JpaCallback() { 109 public Object doInJpa(EntityManager entityManager) throws PersistenceException { 110 // lets try lock the object first 111 if (isUseLocking()) { 112 LOG.info("Attempting to lock: " + activityState); 113 entityManager.lock(activityState, LockModeType.WRITE); 114 LOG.info("Grabbed lock: " + activityState); 115 } 116 117 try { 118 rules.processExpired(activityState); 119 } catch (Exception e) { 120 LOG.error("Failed to process expiration of: " + activityState + ". Reason: " + e, e); 121 } 122 activityState.setEscalationLevel(escalateLevel + 1); 123 return null; 124 } 125 }); 126 } 127 128 protected void doStart() throws Exception { 129 rules.start(); 130 thread = new Thread(this, "ActivityMonitorEngine"); 131 thread.start(); 132 } 133 134 protected void doStop() throws Exception { 135 if (thread != null) { 136 thread = null; 137 } 138 rules.stop(); 139 } 140 }