001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.bam.rules; 018 019 import java.util.ArrayList; 020 import java.util.Date; 021 022 import org.apache.camel.Exchange; 023 import org.apache.camel.Processor; 024 import org.apache.camel.Route; 025 import org.apache.camel.bam.TimeExpression; 026 import org.apache.camel.bam.model.ActivityState; 027 import org.apache.camel.bam.model.ProcessInstance; 028 import org.apache.camel.impl.DefaultExchange; 029 import org.apache.camel.impl.RouteContext; 030 import org.apache.camel.impl.ServiceSupport; 031 import org.apache.camel.model.OutputType; 032 import org.apache.camel.model.RouteType; 033 import org.apache.camel.util.Time; 034 import org.apache.commons.logging.Log; 035 import org.apache.commons.logging.LogFactory; 036 037 import static org.apache.camel.util.ServiceHelper.startServices; 038 import static org.apache.camel.util.ServiceHelper.stopServices; 039 040 /** 041 * A temporal rule for use within BAM 042 * 043 * @version $Revision: $ 044 */ 045 public class TemporalRule extends ServiceSupport { 046 private static final transient Log LOG = LogFactory.getLog(TemporalRule.class); 047 private TimeExpression first; 048 private TimeExpression second; 049 private long expectedMillis; 050 private long overdueMillis; 051 private Processor overdueAction; 052 private OutputType overdueProcessors = new OutputType(); 053 054 public TemporalRule(TimeExpression first, TimeExpression second) { 055 this.first = first; 056 this.second = second; 057 } 058 059 public TemporalRule expectWithin(Time builder) { 060 return expectWithin(builder.toMillis()); 061 } 062 063 public TemporalRule expectWithin(long millis) { 064 expectedMillis = millis; 065 return this; 066 } 067 068 public OutputType errorIfOver(Time builder) { 069 return errorIfOver(builder.toMillis()); 070 } 071 072 public OutputType errorIfOver(long millis) { 073 overdueMillis = millis; 074 if (overdueProcessors == null) { 075 overdueProcessors = new OutputType(); 076 } 077 return overdueProcessors; 078 } 079 080 public TimeExpression getFirst() { 081 return first; 082 } 083 084 public TimeExpression getSecond() { 085 return second; 086 } 087 088 public Processor getOverdueAction() throws Exception { 089 if (overdueAction == null && overdueProcessors != null) { 090 091 // TOOD refactor to avoid this messyness... 092 ArrayList<Route> list = new ArrayList<Route>(); 093 RouteType route = new RouteType(); 094 route.setCamelContext(first.getBuilder().getProcessBuilder().getContext()); 095 RouteContext routeContext = new RouteContext(route, null, list); 096 097 overdueAction = overdueProcessors.createOutputsProcessor(routeContext); 098 } 099 return overdueAction; 100 } 101 102 public void processExchange(Exchange exchange, ProcessInstance instance) { 103 Date firstTime = first.evaluate(instance); 104 if (firstTime == null) { 105 // ignore as first event has not accurred yet 106 return; 107 } 108 109 // TODO now we might need to set the second activity state 110 // to 'grey' to indicate it now could happen? 111 112 // lets force the lazy creation of the second state 113 ActivityState secondState = second.getOrCreateActivityState(instance); 114 if (expectedMillis > 0L) { 115 Date expected = secondState.getTimeExpected(); 116 if (expected == null) { 117 expected = add(firstTime, expectedMillis); 118 secondState.setTimeExpected(expected); 119 } 120 } 121 if (overdueMillis > 0L) { 122 Date overdue = secondState.getTimeOverdue(); 123 if (overdue == null) { 124 overdue = add(firstTime, overdueMillis); 125 secondState.setTimeOverdue(overdue); 126 } 127 } 128 } 129 130 public void processExpired(ActivityState activityState) throws Exception { 131 Processor processor = getOverdueAction(); 132 if (processor != null) { 133 Date now = new Date(); 134 /* 135 TODO this doesn't work and returns null for some strange reason 136 ProcessInstance instance = activityState.getProcessInstance(); 137 ActivityState secondState = second.getActivityState(instance); 138 if (secondState == null) { 139 log.error("Could not find the second state! Process is: " 140 + instance + " with first state: " + first.getActivityState(instance) 141 + " and the state I was called with was: " + activityState); 142 } 143 */ 144 145 ActivityState secondState = activityState; 146 Date overdue = secondState.getTimeOverdue(); 147 if (now.compareTo(overdue) >= 0) { 148 Exchange exchange = createExchange(); 149 exchange.getIn().setBody(activityState); 150 processor.process(exchange); 151 } else { 152 LOG.warn("Process has not actually expired; the time is: " + now + " but the overdue time is: " + overdue); 153 } 154 } 155 } 156 157 protected Exchange createExchange() { 158 return new DefaultExchange(second.getBuilder().getProcessBuilder().getContext()); 159 } 160 161 /** 162 * Returns the date in the future adding the given number of millis 163 * 164 * @param date 165 * @param millis 166 * @return the date in the future 167 */ 168 protected Date add(Date date, long millis) { 169 return new Date(date.getTime() + millis); 170 } 171 172 protected void doStart() throws Exception { 173 startServices(getOverdueAction()); 174 } 175 176 protected void doStop() throws Exception { 177 stopServices(getOverdueAction()); 178 } 179 }