001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.bam.model; 018 019 import org.apache.camel.bam.processor.ProcessContext; 020 import org.apache.camel.bam.rules.ActivityRules; 021 import org.apache.camel.util.ObjectHelper; 022 023 import javax.persistence.CascadeType; 024 import javax.persistence.Entity; 025 import javax.persistence.FetchType; 026 import javax.persistence.GeneratedValue; 027 import javax.persistence.Id; 028 import javax.persistence.ManyToOne; 029 import javax.persistence.Temporal; 030 import javax.persistence.TemporalType; 031 import javax.persistence.Transient; 032 import java.util.Date; 033 034 /** 035 * The default state for a specific activity within a process 036 * 037 * @version $Revision: $ 038 */ 039 @Entity 040 public class ActivityState extends TemporalEntity { 041 private ProcessInstance processInstance; 042 private Integer receivedMessageCount = 0; 043 private ActivityDefinition activityDefinition; 044 private Date timeExpected; 045 @Temporal(TemporalType.TIME) 046 private Date timeOverdue; 047 private Integer escalationLevel = 0; 048 049 // This crap is required to work around a bug in hibernate 050 @Override 051 @Id 052 @GeneratedValue 053 public Long getId() { 054 return super.getId(); 055 } 056 057 @Override 058 public String toString() { 059 return "ActivityState[" + getId() + " on " + getProcessInstance() + " " + getActivityDefinition() + "]"; 060 } 061 062 public synchronized void processExchange(ActivityRules activityRules, ProcessContext context) throws Exception { 063 int messageCount = 0; 064 Integer count = getReceivedMessageCount(); 065 if (count != null) { 066 messageCount = count.intValue(); 067 } 068 setReceivedMessageCount(++messageCount); 069 070 if (messageCount == 1) { 071 onFirstMessage(context); 072 } 073 int expectedMessages = activityRules.getExpectedMessages(); 074 if (messageCount == expectedMessages) { 075 onExpectedMessage(context); 076 } else if (messageCount > expectedMessages) { 077 onExcessMessage(context); 078 } 079 } 080 081 /** 082 * Returns true if this state is for the given activity 083 */ 084 public boolean isActivity(ActivityRules activityRules) { 085 return ObjectHelper.equals(getActivityDefinition(), activityRules.getActivityDefinition()); 086 } 087 088 // Properties 089 // ----------------------------------------------------------------------- 090 @ManyToOne(fetch = FetchType.LAZY, cascade = {CascadeType.PERSIST }) 091 public ProcessInstance getProcessInstance() { 092 return processInstance; 093 } 094 095 public void setProcessInstance(ProcessInstance processInstance) { 096 this.processInstance = processInstance; 097 processInstance.getActivityStates().add(this); 098 } 099 100 @ManyToOne(fetch = FetchType.LAZY, cascade = {CascadeType.PERSIST }) 101 public ActivityDefinition getActivityDefinition() { 102 return activityDefinition; 103 } 104 105 public void setActivityDefinition(ActivityDefinition activityDefinition) { 106 this.activityDefinition = activityDefinition; 107 } 108 109 public Integer getEscalationLevel() { 110 return escalationLevel; 111 } 112 113 public void setEscalationLevel(Integer escalationLevel) { 114 this.escalationLevel = escalationLevel; 115 } 116 117 public Integer getReceivedMessageCount() { 118 return receivedMessageCount; 119 } 120 121 public void setReceivedMessageCount(Integer receivedMessageCount) { 122 this.receivedMessageCount = receivedMessageCount; 123 } 124 125 @Temporal(TemporalType.TIME) 126 public Date getTimeExpected() { 127 return timeExpected; 128 } 129 130 public void setTimeExpected(Date timeExpected) { 131 this.timeExpected = timeExpected; 132 } 133 134 @Temporal(TemporalType.TIME) 135 public Date getTimeOverdue() { 136 return timeOverdue; 137 } 138 139 public void setTimeOverdue(Date timeOverdue) { 140 this.timeOverdue = timeOverdue; 141 } 142 143 public void setTimeCompleted(Date timeCompleted) { 144 super.setTimeCompleted(timeCompleted); 145 if (timeCompleted != null) { 146 setEscalationLevel(-1); 147 } 148 } 149 150 @Transient 151 public String getCorrelationKey() { 152 ProcessInstance pi = getProcessInstance(); 153 if (pi == null) { 154 return null; 155 } 156 return pi.getCorrelationKey(); 157 } 158 159 // Implementation methods 160 // ----------------------------------------------------------------------- 161 162 /** 163 * Called when the first message is reached 164 */ 165 protected void onFirstMessage(ProcessContext context) { 166 if (!isStarted()) { 167 setTimeStarted(currentTime()); 168 context.onStarted(this); 169 } 170 } 171 172 /** 173 * Called when the expected number of messages are is reached 174 */ 175 protected void onExpectedMessage(ProcessContext context) { 176 if (!isCompleted()) { 177 setTimeCompleted(currentTime()); 178 context.onCompleted(this); 179 } 180 } 181 182 /** 183 * Called when an excess message (after the expected number of messages) are 184 * received 185 */ 186 protected void onExcessMessage(ProcessContext context) { 187 // TODO 188 } 189 190 protected Date currentTime() { 191 return new Date(); 192 } 193 }