Coverage Report - org.apache.camel.bam.rules.TemporalRule
 
Classes in this File Line Coverage Branch Coverage Complexity
TemporalRule
91% 
100% 
0
 
 1  
 /**
 2  
  * Licensed to the Apache Software Foundation (ASF) under one or more
 3  
  * contributor license agreements.  See the NOTICE file distributed with
 4  
  * this work for additional information regarding copyright ownership.
 5  
  * The ASF licenses this file to You under the Apache License, Version 2.0
 6  
  * (the "License"); you may not use this file except in compliance with
 7  
  * the License.  You may obtain a copy of the License at
 8  
  *
 9  
  *      http://www.apache.org/licenses/LICENSE-2.0
 10  
  *
 11  
  * Unless required by applicable law or agreed to in writing, software
 12  
  * distributed under the License is distributed on an "AS IS" BASIS,
 13  
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 14  
  * See the License for the specific language governing permissions and
 15  
  * limitations under the License.
 16  
  */
 17  
 package org.apache.camel.bam.rules;
 18  
 
 19  
 import java.util.ArrayList;
 20  
 import java.util.Date;
 21  
 
 22  
 import org.apache.camel.Exchange;
 23  
 import org.apache.camel.Processor;
 24  
 import org.apache.camel.Route;
 25  
 import org.apache.camel.bam.TimeExpression;
 26  
 import org.apache.camel.bam.model.ActivityState;
 27  
 import org.apache.camel.bam.model.ProcessInstance;
 28  
 import org.apache.camel.impl.DefaultExchange;
 29  
 import org.apache.camel.impl.RouteContext;
 30  
 import org.apache.camel.impl.ServiceSupport;
 31  
 import org.apache.camel.model.OutputType;
 32  
 import org.apache.camel.model.RouteType;
 33  
 import org.apache.camel.util.Time;
 34  
 import org.apache.commons.logging.Log;
 35  
 import org.apache.commons.logging.LogFactory;
 36  
 
 37  
 import static org.apache.camel.util.ServiceHelper.startServices;
 38  
 import static org.apache.camel.util.ServiceHelper.stopServices;
 39  
 
 40  
 /**
 41  
  * A temporal rule for use within BAM
 42  
  *
 43  
  * @version $Revision: $
 44  
  */
 45  
 public class TemporalRule extends ServiceSupport {
 46  2
     private static final transient Log LOG = LogFactory.getLog(TemporalRule.class);
 47  
     private TimeExpression first;
 48  
     private TimeExpression second;
 49  
     private long expectedMillis;
 50  
     private long overdueMillis;
 51  
     private Processor overdueAction;
 52  2
     private OutputType overdueProcessors = new OutputType();
 53  
 
 54  2
     public TemporalRule(TimeExpression first, TimeExpression second) {
 55  2
         this.first = first;
 56  2
         this.second = second;
 57  2
     }
 58  
 
 59  
     public TemporalRule expectWithin(Time builder) {
 60  2
         return expectWithin(builder.toMillis());
 61  
     }
 62  
 
 63  
     public TemporalRule expectWithin(long millis) {
 64  2
         expectedMillis = millis;
 65  2
         return this;
 66  
     }
 67  
 
 68  
     public OutputType errorIfOver(Time builder) {
 69  2
         return errorIfOver(builder.toMillis());
 70  
     }
 71  
 
 72  
     public OutputType errorIfOver(long millis) {
 73  2
         overdueMillis = millis;
 74  2
         if (overdueProcessors == null) {
 75  0
             overdueProcessors = new OutputType();
 76  
         }
 77  2
         return overdueProcessors;
 78  
     }
 79  
 
 80  
     public TimeExpression getFirst() {
 81  0
         return first;
 82  
     }
 83  
 
 84  
     public TimeExpression getSecond() {
 85  2
         return second;
 86  
     }
 87  
 
 88  
     public Processor getOverdueAction() throws Exception {
 89  4
         if (overdueAction == null && overdueProcessors != null) {
 90  
 
 91  
             // TOOD refactor to avoid this messyness...
 92  2
             ArrayList<Route> list = new ArrayList<Route>();
 93  2
             RouteType route = new RouteType();
 94  2
             route.setCamelContext(first.getBuilder().getProcessBuilder().getContext());
 95  2
             RouteContext routeContext = new RouteContext(route, null, list);
 96  
 
 97  2
             overdueAction = overdueProcessors.createOutputsProcessor(routeContext);
 98  
         }
 99  4
         return overdueAction;
 100  
     }
 101  
 
 102  
     public void processExchange(Exchange exchange, ProcessInstance instance) {
 103  6
         Date firstTime = first.evaluate(instance);
 104  6
         if (firstTime == null) {
 105  
             // ignore as first event has not accurred yet
 106  2
             return;
 107  
         }
 108  
 
 109  
         // TODO now we might need to set the second activity state
 110  
         // to 'grey' to indicate it now could happen?
 111  
 
 112  
         // lets force the lazy creation of the second state
 113  4
         ActivityState secondState = second.getOrCreateActivityState(instance);
 114  4
         if (expectedMillis > 0L) {
 115  4
             Date expected = secondState.getTimeExpected();
 116  4
             if (expected == null) {
 117  4
                 expected = add(firstTime, expectedMillis);
 118  4
                 secondState.setTimeExpected(expected);
 119  
             }
 120  
         }
 121  4
         if (overdueMillis > 0L) {
 122  4
             Date overdue = secondState.getTimeOverdue();
 123  4
             if (overdue == null) {
 124  4
                 overdue = add(firstTime, overdueMillis);
 125  4
                 secondState.setTimeOverdue(overdue);
 126  
             }
 127  
         }
 128  4
     }
 129  
 
 130  
     public void processExpired(ActivityState activityState) throws Exception {
 131  2
         Processor processor = getOverdueAction();
 132  2
         if (processor != null) {
 133  2
             Date now = new Date();
 134  
 /*
 135  
             TODO this doesn't work and returns null for some strange reason
 136  
             ProcessInstance instance = activityState.getProcessInstance();
 137  
             ActivityState secondState = second.getActivityState(instance);
 138  
             if (secondState == null) {
 139  
                 log.error("Could not find the second state! Process is: " 
 140  
                 + instance + " with first state: " + first.getActivityState(instance) 
 141  
                 + " and the state I was called with was: " + activityState);
 142  
             }
 143  
 */
 144  
 
 145  2
             ActivityState secondState = activityState;
 146  2
             Date overdue = secondState.getTimeOverdue();
 147  2
             if (now.compareTo(overdue) >= 0) {
 148  2
                 Exchange exchange = createExchange();
 149  2
                 exchange.getIn().setBody(activityState);
 150  2
                 processor.process(exchange);
 151  2
             } else {
 152  0
                 LOG.warn("Process has not actually expired; the time is: " + now + " but the overdue time is: " + overdue);
 153  
             }
 154  
         }
 155  2
     }
 156  
 
 157  
     protected Exchange createExchange() {
 158  2
         return new DefaultExchange(second.getBuilder().getProcessBuilder().getContext());
 159  
     }
 160  
 
 161  
     /**
 162  
      * Returns the date in the future adding the given number of millis
 163  
      *
 164  
      * @param date
 165  
      * @param millis
 166  
      * @return the date in the future
 167  
      */
 168  
     protected Date add(Date date, long millis) {
 169  8
         return new Date(date.getTime() + millis);
 170  
     }
 171  
 
 172  
     protected void doStart() throws Exception {
 173  2
         startServices(getOverdueAction());
 174  2
     }
 175  
 
 176  
     protected void doStop() throws Exception {
 177  0
         stopServices(getOverdueAction());
 178  0
     }
 179  
 }