001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.bam.rules;
018    
019    import java.util.ArrayList;
020    import java.util.Date;
021    
022    import org.apache.camel.Exchange;
023    import org.apache.camel.Processor;
024    import org.apache.camel.Route;
025    import org.apache.camel.bam.TimeExpression;
026    import org.apache.camel.bam.model.ActivityState;
027    import org.apache.camel.bam.model.ProcessInstance;
028    import org.apache.camel.impl.DefaultExchange;
029    import org.apache.camel.impl.RouteContext;
030    import org.apache.camel.impl.ServiceSupport;
031    import org.apache.camel.model.OutputType;
032    import org.apache.camel.model.RouteType;
033    import org.apache.camel.util.Time;
034    import org.apache.commons.logging.Log;
035    import org.apache.commons.logging.LogFactory;
036    
037    import static org.apache.camel.util.ServiceHelper.startServices;
038    import static org.apache.camel.util.ServiceHelper.stopServices;
039    
040    /**
041     * A temporal rule for use within BAM
042     *
043     * @version $Revision: $
044     */
045    public class TemporalRule extends ServiceSupport {
046        private static final transient Log LOG = LogFactory.getLog(TemporalRule.class);
047        private TimeExpression first;
048        private TimeExpression second;
049        private long expectedMillis;
050        private long overdueMillis;
051        private Processor overdueAction;
052        private OutputType overdueProcessors = new OutputType();
053    
054        public TemporalRule(TimeExpression first, TimeExpression second) {
055            this.first = first;
056            this.second = second;
057        }
058    
059        public TemporalRule expectWithin(Time builder) {
060            return expectWithin(builder.toMillis());
061        }
062    
063        public TemporalRule expectWithin(long millis) {
064            expectedMillis = millis;
065            return this;
066        }
067    
068        public OutputType errorIfOver(Time builder) {
069            return errorIfOver(builder.toMillis());
070        }
071    
072        public OutputType errorIfOver(long millis) {
073            overdueMillis = millis;
074            if (overdueProcessors == null) {
075                overdueProcessors = new OutputType();
076            }
077            return overdueProcessors;
078        }
079    
080        public TimeExpression getFirst() {
081            return first;
082        }
083    
084        public TimeExpression getSecond() {
085            return second;
086        }
087    
088        public Processor getOverdueAction() throws Exception {
089            if (overdueAction == null && overdueProcessors != null) {
090    
091                // TOOD refactor to avoid this messyness...
092                ArrayList<Route> list = new ArrayList<Route>();
093                RouteType route = new RouteType();
094                route.setCamelContext(first.getBuilder().getProcessBuilder().getContext());
095                RouteContext routeContext = new RouteContext(route, null, list);
096    
097                overdueAction = overdueProcessors.createOutputsProcessor(routeContext);
098            }
099            return overdueAction;
100        }
101    
102        public void processExchange(Exchange exchange, ProcessInstance instance) {
103            Date firstTime = first.evaluate(instance);
104            if (firstTime == null) {
105                // ignore as first event has not accurred yet
106                return;
107            }
108    
109            // TODO now we might need to set the second activity state
110            // to 'grey' to indicate it now could happen?
111    
112            // lets force the lazy creation of the second state
113            ActivityState secondState = second.getOrCreateActivityState(instance);
114            if (expectedMillis > 0L) {
115                Date expected = secondState.getTimeExpected();
116                if (expected == null) {
117                    expected = add(firstTime, expectedMillis);
118                    secondState.setTimeExpected(expected);
119                }
120            }
121            if (overdueMillis > 0L) {
122                Date overdue = secondState.getTimeOverdue();
123                if (overdue == null) {
124                    overdue = add(firstTime, overdueMillis);
125                    secondState.setTimeOverdue(overdue);
126                }
127            }
128        }
129    
130        public void processExpired(ActivityState activityState) throws Exception {
131            Processor processor = getOverdueAction();
132            if (processor != null) {
133                Date now = new Date();
134    /*
135                TODO this doesn't work and returns null for some strange reason
136                ProcessInstance instance = activityState.getProcessInstance();
137                ActivityState secondState = second.getActivityState(instance);
138                if (secondState == null) {
139                    log.error("Could not find the second state! Process is: " 
140                    + instance + " with first state: " + first.getActivityState(instance) 
141                    + " and the state I was called with was: " + activityState);
142                }
143    */
144    
145                ActivityState secondState = activityState;
146                Date overdue = secondState.getTimeOverdue();
147                if (now.compareTo(overdue) >= 0) {
148                    Exchange exchange = createExchange();
149                    exchange.getIn().setBody(activityState);
150                    processor.process(exchange);
151                } else {
152                    LOG.warn("Process has not actually expired; the time is: " + now + " but the overdue time is: " + overdue);
153                }
154            }
155        }
156    
157        protected Exchange createExchange() {
158            return new DefaultExchange(second.getBuilder().getProcessBuilder().getContext());
159        }
160    
161        /**
162         * Returns the date in the future adding the given number of millis
163         *
164         * @param date
165         * @param millis
166         * @return the date in the future
167         */
168        protected Date add(Date date, long millis) {
169            return new Date(date.getTime() + millis);
170        }
171    
172        protected void doStart() throws Exception {
173            startServices(getOverdueAction());
174        }
175    
176        protected void doStop() throws Exception {
177            stopServices(getOverdueAction());
178        }
179    }