001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.bam.model;
018    
019    import org.apache.camel.bam.processor.ProcessContext;
020    import org.apache.camel.bam.rules.ActivityRules;
021    import org.apache.camel.util.ObjectHelper;
022    
023    import javax.persistence.CascadeType;
024    import javax.persistence.Entity;
025    import javax.persistence.FetchType;
026    import javax.persistence.GeneratedValue;
027    import javax.persistence.Id;
028    import javax.persistence.ManyToOne;
029    import javax.persistence.Temporal;
030    import javax.persistence.TemporalType;
031    import javax.persistence.Transient;
032    import java.util.Date;
033    
034    /**
035     * The default state for a specific activity within a process
036     * 
037     * @version $Revision: $
038     */
039    @Entity
040    public class ActivityState extends TemporalEntity {
041        private ProcessInstance processInstance;
042        private Integer receivedMessageCount = 0;
043        private ActivityDefinition activityDefinition;
044        private Date timeExpected;
045        @Temporal(TemporalType.TIME)
046        private Date timeOverdue;
047        private Integer escalationLevel = 0;
048    
049        // This crap is required to work around a bug in hibernate
050        @Override
051        @Id
052        @GeneratedValue
053        public Long getId() {
054            return super.getId();
055        }
056    
057        @Override
058        public String toString() {
059            return "ActivityState[" + getId() + " on " + getProcessInstance() + " " + getActivityDefinition() + "]";
060        }
061    
062        public synchronized void processExchange(ActivityRules activityRules, ProcessContext context) throws Exception {
063            int messageCount = 0;
064            Integer count = getReceivedMessageCount();
065            if (count != null) {
066                messageCount = count.intValue();
067            }
068            setReceivedMessageCount(++messageCount);
069    
070            if (messageCount == 1) {
071                onFirstMessage(context);
072            }
073            int expectedMessages = activityRules.getExpectedMessages();
074            if (messageCount == expectedMessages) {
075                onExpectedMessage(context);
076            } else if (messageCount > expectedMessages) {
077                onExcessMessage(context);
078            }
079        }
080    
081        /**
082         * Returns true if this state is for the given activity
083         */
084        public boolean isActivity(ActivityRules activityRules) {
085            return ObjectHelper.equals(getActivityDefinition(), activityRules.getActivityDefinition());
086        }
087    
088        // Properties
089        // -----------------------------------------------------------------------
090        @ManyToOne(fetch = FetchType.LAZY, cascade = {CascadeType.PERSIST })
091        public ProcessInstance getProcessInstance() {
092            return processInstance;
093        }
094    
095        public void setProcessInstance(ProcessInstance processInstance) {
096            this.processInstance = processInstance;
097            processInstance.getActivityStates().add(this);
098        }
099    
100        @ManyToOne(fetch = FetchType.LAZY, cascade = {CascadeType.PERSIST })
101        public ActivityDefinition getActivityDefinition() {
102            return activityDefinition;
103        }
104    
105        public void setActivityDefinition(ActivityDefinition activityDefinition) {
106            this.activityDefinition = activityDefinition;
107        }
108    
109        public Integer getEscalationLevel() {
110            return escalationLevel;
111        }
112    
113        public void setEscalationLevel(Integer escalationLevel) {
114            this.escalationLevel = escalationLevel;
115        }
116    
117        public Integer getReceivedMessageCount() {
118            return receivedMessageCount;
119        }
120    
121        public void setReceivedMessageCount(Integer receivedMessageCount) {
122            this.receivedMessageCount = receivedMessageCount;
123        }
124    
125        @Temporal(TemporalType.TIME)
126        public Date getTimeExpected() {
127            return timeExpected;
128        }
129    
130        public void setTimeExpected(Date timeExpected) {
131            this.timeExpected = timeExpected;
132        }
133    
134        @Temporal(TemporalType.TIME)
135        public Date getTimeOverdue() {
136            return timeOverdue;
137        }
138    
139        public void setTimeOverdue(Date timeOverdue) {
140            this.timeOverdue = timeOverdue;
141        }
142    
143        public void setTimeCompleted(Date timeCompleted) {
144            super.setTimeCompleted(timeCompleted);
145            if (timeCompleted != null) {
146                setEscalationLevel(-1);
147            }
148        }
149    
150        @Transient
151        public String getCorrelationKey() {
152            ProcessInstance pi = getProcessInstance();
153            if (pi == null) {
154                return null;
155            }
156            return pi.getCorrelationKey();
157        }
158    
159        // Implementation methods
160        // -----------------------------------------------------------------------
161    
162        /**
163         * Called when the first message is reached
164         */
165        protected void onFirstMessage(ProcessContext context) {
166            if (!isStarted()) {
167                setTimeStarted(currentTime());
168                context.onStarted(this);
169            }
170        }
171    
172        /**
173         * Called when the expected number of messages are is reached
174         */
175        protected void onExpectedMessage(ProcessContext context) {
176            if (!isCompleted()) {
177                setTimeCompleted(currentTime());
178                context.onCompleted(this);
179            }
180        }
181    
182        /**
183         * Called when an excess message (after the expected number of messages) are
184         * received
185         */
186        protected void onExcessMessage(ProcessContext context) {
187            // TODO
188        }
189    
190        protected Date currentTime() {
191            return new Date();
192        }
193    }