001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.bam.model; 018 019 import org.apache.camel.bam.processor.ProcessContext; 020 import org.apache.camel.bam.rules.ActivityRules; 021 import org.apache.camel.util.ObjectHelper; 022 023 import javax.persistence.CascadeType; 024 import javax.persistence.Entity; 025 import javax.persistence.FetchType; 026 import javax.persistence.GeneratedValue; 027 import javax.persistence.Id; 028 import javax.persistence.ManyToOne; 029 import java.util.Date; 030 031 /** 032 * The default state for a specific activity within a process 033 * 034 * @version $Revision: $ 035 */ 036 @Entity 037 public class ActivityState extends TemporalEntity { 038 private ProcessInstance processInstance; 039 private Integer receivedMessageCount = 0; 040 private ActivityDefinition activityDefinition; 041 private Date timeExpected; 042 private Date timeOverdue; 043 private Integer escalationLevel = 0; 044 045 // This crap is required to work around a bug in hibernate 046 @Override 047 @Id 048 @GeneratedValue 049 public Long getId() { 050 return super.getId(); 051 } 052 053 @Override 054 public String toString() { 055 return "ActivityState[" + getId() + " " + getActivityDefinition() + "]"; 056 } 057 058 public synchronized void processExchange(ActivityRules activityRules, ProcessContext context) throws Exception { 059 int messageCount = 0; 060 Integer count = getReceivedMessageCount(); 061 if (count != null) { 062 messageCount = count.intValue(); 063 } 064 setReceivedMessageCount(++messageCount); 065 066 if (messageCount == 1) { 067 onFirstMessage(context); 068 } 069 int expectedMessages = activityRules.getExpectedMessages(); 070 if (messageCount == expectedMessages) { 071 onExpectedMessage(context); 072 } 073 else if (messageCount > expectedMessages) { 074 onExcessMessage(context); 075 } 076 } 077 078 /** 079 * Returns true if this state is for the given activity 080 */ 081 public boolean isActivity(ActivityRules activityRules) { 082 return ObjectHelper.equals(getActivityDefinition(), activityRules.getActivityDefinition()); 083 } 084 085 // Properties 086 //----------------------------------------------------------------------- 087 @ManyToOne(fetch = FetchType.LAZY, cascade = {CascadeType.PERSIST}) 088 public ProcessInstance getProcessInstance() { 089 return processInstance; 090 } 091 092 public void setProcessInstance(ProcessInstance processInstance) { 093 this.processInstance = processInstance; 094 processInstance.getActivityStates().add(this); 095 } 096 097 @ManyToOne(fetch = FetchType.LAZY, cascade = {CascadeType.PERSIST}) 098 public ActivityDefinition getActivityDefinition() { 099 return activityDefinition; 100 } 101 102 public void setActivityDefinition(ActivityDefinition activityDefinition) { 103 this.activityDefinition = activityDefinition; 104 } 105 106 public Integer getEscalationLevel() { 107 return escalationLevel; 108 } 109 110 public void setEscalationLevel(Integer escalationLevel) { 111 this.escalationLevel = escalationLevel; 112 } 113 114 public Integer getReceivedMessageCount() { 115 return receivedMessageCount; 116 } 117 118 public void setReceivedMessageCount(Integer receivedMessageCount) { 119 this.receivedMessageCount = receivedMessageCount; 120 } 121 122 public Date getTimeExpected() { 123 return timeExpected; 124 } 125 126 public void setTimeExpected(Date timeExpected) { 127 this.timeExpected = timeExpected; 128 } 129 130 public Date getTimeOverdue() { 131 return timeOverdue; 132 } 133 134 public void setTimeOverdue(Date timeOverdue) { 135 this.timeOverdue = timeOverdue; 136 } 137 138 public void setTimeCompleted(Date timeCompleted) { 139 super.setTimeCompleted(timeCompleted); 140 if (timeCompleted != null) { 141 setEscalationLevel(-1); 142 } 143 } 144 145 // Implementation methods 146 //----------------------------------------------------------------------- 147 148 /** 149 * Called when the first message is reached 150 */ 151 protected void onFirstMessage(ProcessContext context) { 152 if (!isStarted()) { 153 setTimeStarted(currentTime()); 154 context.onStarted(this); 155 } 156 } 157 158 /** 159 * Called when the expected number of messages are is reached 160 */ 161 protected void onExpectedMessage(ProcessContext context) { 162 if (!isCompleted()) { 163 setTimeCompleted(currentTime()); 164 context.onCompleted(this); 165 } 166 } 167 168 /** 169 * Called when an excess message (after the expected number of messages) 170 * are received 171 */ 172 protected void onExcessMessage(ProcessContext context) { 173 // TODO 174 } 175 176 protected Date currentTime() { 177 return new Date(); 178 } 179 }