001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.bam; 019 020 import org.apache.camel.bam.model.ActivityState; 021 import org.apache.camel.impl.ServiceSupport; 022 import org.apache.commons.logging.Log; 023 import org.apache.commons.logging.LogFactory; 024 import org.springframework.orm.jpa.JpaCallback; 025 import org.springframework.orm.jpa.JpaTemplate; 026 import org.springframework.transaction.support.TransactionTemplate; 027 import org.springframework.transaction.support.TransactionCallbackWithoutResult; 028 import org.springframework.transaction.TransactionStatus; 029 030 import javax.persistence.EntityManager; 031 import javax.persistence.LockModeType; 032 import javax.persistence.PersistenceException; 033 import java.util.Date; 034 import java.util.List; 035 036 /** 037 * @version $Revision: $ 038 */ 039 public class ActivityMonitorEngine extends ServiceSupport implements Runnable { 040 private static final Log log = LogFactory.getLog(ActivityMonitorEngine.class); 041 042 private JpaTemplate template; 043 private TransactionTemplate transactionTemplate; 044 private ProcessRules rules; 045 private int escalateLevel = 0; 046 private long windowMillis = 1000L; 047 private Thread thread; 048 049 public ActivityMonitorEngine(JpaTemplate template, TransactionTemplate transactionTemplate, ProcessRules rules) { 050 this.template = template; 051 this.transactionTemplate = transactionTemplate; 052 this.rules = rules; 053 } 054 055 public void run() { 056 log.info("Starting to poll for timeout events"); 057 058 while (!isStopped()) { 059 try { 060 long now = System.currentTimeMillis(); 061 long nextPoll = now + windowMillis; 062 final Date timeNow = new Date(now); 063 064 transactionTemplate.execute(new TransactionCallbackWithoutResult() { 065 protected void doInTransactionWithoutResult(TransactionStatus status) { 066 List<ActivityState> list = template.find("select x from " + ActivityState.class.getName() + " x where x.escalationLevel = ?1 and x.timeOverdue < ?2", escalateLevel, timeNow); 067 for (ActivityState activityState : list) { 068 fireExpiredEvent(activityState); 069 } 070 } 071 }); 072 073 long timeToSleep = nextPoll - System.currentTimeMillis(); 074 if (timeToSleep > 0) { 075 log.debug("Sleeping for " + timeToSleep + " millis"); 076 try { 077 Thread.sleep(timeToSleep); 078 } 079 catch (InterruptedException e) { 080 log.debug("Caught: " + e, e); 081 } 082 } 083 } 084 catch (Exception e) { 085 log.error("Caught: " + e, e); 086 } 087 } 088 } 089 090 protected void fireExpiredEvent(final ActivityState activityState) { 091 log.info("Trying to fire expiration of: " + activityState); 092 093 template.execute(new JpaCallback() { 094 public Object doInJpa(EntityManager entityManager) throws PersistenceException { 095 // lets try lock the object first 096 entityManager.lock(activityState, LockModeType.WRITE); 097 if (activityState.getEscalationLevel() == escalateLevel) { 098 try { 099 rules.processExpired(activityState); 100 } 101 catch (Exception e) { 102 log.error("Failed to process expiration of: " + activityState + ". Reason: " + e, e); 103 } 104 activityState.setEscalationLevel(escalateLevel + 1); 105 } 106 return null; 107 } 108 }); 109 } 110 111 protected void doStart() throws Exception { 112 thread = new Thread(this, "ActivityMonitorEngine"); 113 thread.start(); 114 } 115 116 protected void doStop() throws Exception { 117 if (thread != null) { 118 thread = null; 119 } 120 } 121 }