001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.bam.processor; 019 020 import org.apache.camel.bam.model.ActivityState; 021 import org.apache.camel.bam.rules.ProcessRules; 022 import org.apache.camel.impl.ServiceSupport; 023 import org.apache.commons.logging.Log; 024 import org.apache.commons.logging.LogFactory; 025 import org.springframework.orm.jpa.JpaCallback; 026 import org.springframework.orm.jpa.JpaTemplate; 027 import org.springframework.transaction.TransactionStatus; 028 import org.springframework.transaction.support.TransactionCallbackWithoutResult; 029 import org.springframework.transaction.support.TransactionTemplate; 030 031 import javax.persistence.EntityManager; 032 import javax.persistence.LockModeType; 033 import javax.persistence.PersistenceException; 034 import java.util.Date; 035 import java.util.List; 036 037 /** 038 * A timer engine to monitor for expired activities and perform whatever actions are required. 039 * 040 * @version $Revision: $ 041 */ 042 public class ActivityMonitorEngine extends ServiceSupport implements Runnable { 043 private static final Log log = LogFactory.getLog(ActivityMonitorEngine.class); 044 private JpaTemplate template; 045 private TransactionTemplate transactionTemplate; 046 private ProcessRules rules; 047 private int escalateLevel = 0; 048 private long windowMillis = 1000L; 049 private Thread thread; 050 private boolean useLocking = false; 051 052 public ActivityMonitorEngine(JpaTemplate template, TransactionTemplate transactionTemplate, ProcessRules rules) { 053 this.template = template; 054 this.transactionTemplate = transactionTemplate; 055 this.rules = rules; 056 } 057 058 public boolean isUseLocking() { 059 return useLocking; 060 } 061 062 public void setUseLocking(boolean useLocking) { 063 this.useLocking = useLocking; 064 } 065 066 public void run() { 067 log.debug("Starting to poll for timeout events"); 068 069 while (!isStopped()) { 070 try { 071 long now = System.currentTimeMillis(); 072 long nextPoll = now + windowMillis; 073 final Date timeNow = new Date(now); 074 075 transactionTemplate.execute(new TransactionCallbackWithoutResult() { 076 protected void doInTransactionWithoutResult(TransactionStatus status) { 077 List<ActivityState> list = template.find("select x from " + ActivityState.class.getName() + " x where x.escalationLevel = ?1 and x.timeOverdue < ?2", escalateLevel, timeNow); 078 for (ActivityState activityState : list) { 079 fireExpiredEvent(activityState); 080 } 081 } 082 }); 083 084 long timeToSleep = nextPoll - System.currentTimeMillis(); 085 if (timeToSleep > 0) { 086 if (log.isDebugEnabled()) { 087 log.debug("Sleeping for " + timeToSleep + " millis"); 088 } 089 try { 090 Thread.sleep(timeToSleep); 091 } 092 catch (InterruptedException e) { 093 log.debug("Caught: " + e, e); 094 } 095 } 096 } 097 catch (Exception e) { 098 log.error("Caught: " + e, e); 099 } 100 } 101 } 102 103 protected void fireExpiredEvent(final ActivityState activityState) { 104 if (log.isDebugEnabled()) { 105 log.debug("Trying to fire expiration of: " + activityState); 106 } 107 108 template.execute(new JpaCallback() { 109 public Object doInJpa(EntityManager entityManager) throws PersistenceException { 110 // lets try lock the object first 111 if (isUseLocking()) { 112 log.info("Attempting to lock: " + activityState); 113 entityManager.lock(activityState, LockModeType.WRITE); 114 log.info("Grabbed lock: " + activityState); 115 } 116 117 try { 118 rules.processExpired(activityState); 119 } 120 catch (Exception e) { 121 log.error("Failed to process expiration of: " + activityState + ". Reason: " + e, e); 122 } 123 activityState.setEscalationLevel(escalateLevel + 1); 124 return null; 125 } 126 }); 127 } 128 129 protected void doStart() throws Exception { 130 rules.start(); 131 thread = new Thread(this, "ActivityMonitorEngine"); 132 thread.start(); 133 } 134 135 protected void doStop() throws Exception { 136 if (thread != null) { 137 thread = null; 138 } 139 rules.stop(); 140 } 141 }