001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.bam.model;
018    
019    import org.apache.camel.bam.processor.ProcessContext;
020    import org.apache.camel.bam.rules.ActivityRules;
021    import org.apache.camel.util.ObjectHelper;
022    
023    import javax.persistence.CascadeType;
024    import javax.persistence.Entity;
025    import javax.persistence.FetchType;
026    import javax.persistence.GeneratedValue;
027    import javax.persistence.Id;
028    import javax.persistence.ManyToOne;
029    import java.util.Date;
030    
031    /**
032     * The default state for a specific activity within a process
033     *
034     * @version $Revision: $
035     */
036    @Entity
037    public class ActivityState extends TemporalEntity {
038        private ProcessInstance processInstance;
039        private Integer receivedMessageCount = 0;
040        private ActivityDefinition activityDefinition;
041        private Date timeExpected;
042        private Date timeOverdue;
043        private Integer escalationLevel = 0;
044    
045        // This crap is required to work around a bug in hibernate
046        @Override
047        @Id
048        @GeneratedValue
049        public Long getId() {
050            return super.getId();
051        }
052    
053        @Override
054        public String toString() {
055            return "ActivityState[" + getId() + " " + getActivityDefinition() + "]";
056        }
057    
058        public synchronized void processExchange(ActivityRules activityRules, ProcessContext context) throws Exception {
059            int messageCount = 0;
060            Integer count = getReceivedMessageCount();
061            if (count != null) {
062                messageCount = count.intValue();
063            }
064            setReceivedMessageCount(++messageCount);
065    
066            if (messageCount == 1) {
067                onFirstMessage(context);
068            }
069            int expectedMessages = activityRules.getExpectedMessages();
070            if (messageCount == expectedMessages) {
071                onExpectedMessage(context);
072            }
073            else if (messageCount > expectedMessages) {
074                onExcessMessage(context);
075            }
076        }
077    
078        /**
079         * Returns true if this state is for the given activity
080         */
081        public boolean isActivity(ActivityRules activityRules) {
082            return ObjectHelper.equals(getActivityDefinition(), activityRules.getActivityDefinition());
083        }
084    
085        // Properties
086        //-----------------------------------------------------------------------
087        @ManyToOne(fetch = FetchType.LAZY, cascade = {CascadeType.PERSIST})
088        public ProcessInstance getProcessInstance() {
089            return processInstance;
090        }
091    
092        public void setProcessInstance(ProcessInstance processInstance) {
093            this.processInstance = processInstance;
094            processInstance.getActivityStates().add(this);
095        }
096    
097        @ManyToOne(fetch = FetchType.LAZY, cascade = {CascadeType.PERSIST})
098        public ActivityDefinition getActivityDefinition() {
099            return activityDefinition;
100        }
101    
102        public void setActivityDefinition(ActivityDefinition activityDefinition) {
103            this.activityDefinition = activityDefinition;
104        }
105    
106        public Integer getEscalationLevel() {
107            return escalationLevel;
108        }
109    
110        public void setEscalationLevel(Integer escalationLevel) {
111            this.escalationLevel = escalationLevel;
112        }
113    
114        public Integer getReceivedMessageCount() {
115            return receivedMessageCount;
116        }
117    
118        public void setReceivedMessageCount(Integer receivedMessageCount) {
119            this.receivedMessageCount = receivedMessageCount;
120        }
121    
122        public Date getTimeExpected() {
123            return timeExpected;
124        }
125    
126        public void setTimeExpected(Date timeExpected) {
127            this.timeExpected = timeExpected;
128        }
129    
130        public Date getTimeOverdue() {
131            return timeOverdue;
132        }
133    
134        public void setTimeOverdue(Date timeOverdue) {
135            this.timeOverdue = timeOverdue;
136        }
137    
138        public void setTimeCompleted(Date timeCompleted) {
139            super.setTimeCompleted(timeCompleted);
140            if (timeCompleted != null) {
141                setEscalationLevel(-1);
142            }
143        }
144    
145        // Implementation methods
146        //-----------------------------------------------------------------------
147    
148        /**
149         * Called when the first message is reached
150         */
151        protected void onFirstMessage(ProcessContext context) {
152            if (!isStarted()) {
153                setTimeStarted(currentTime());
154                context.onStarted(this);
155            }
156        }
157    
158        /**
159         * Called when the expected number of messages are is reached
160         */
161        protected void onExpectedMessage(ProcessContext context) {
162            if (!isCompleted()) {
163                setTimeCompleted(currentTime());
164                context.onCompleted(this);
165            }
166        }
167    
168        /**
169         * Called when an excess message (after the expected number of messages)
170         * are received
171         */
172        protected void onExcessMessage(ProcessContext context) {
173            // TODO
174        }
175    
176        protected Date currentTime() {
177            return new Date();
178        }
179    }