001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.bam.rules; 018 019 import org.apache.camel.Exchange; 020 import org.apache.camel.Processor; 021 import org.apache.camel.bam.TimeExpression; 022 import org.apache.camel.bam.model.ActivityState; 023 import org.apache.camel.bam.model.ProcessInstance; 024 import org.apache.camel.builder.FromBuilder; 025 import org.apache.camel.builder.ProcessorFactory; 026 import org.apache.camel.impl.DefaultExchange; 027 import org.apache.camel.impl.ServiceSupport; 028 import static org.apache.camel.util.ServiceHelper.startServices; 029 import static org.apache.camel.util.ServiceHelper.stopServices; 030 import org.apache.camel.util.Time; 031 import org.apache.commons.logging.Log; 032 import org.apache.commons.logging.LogFactory; 033 034 import java.util.Date; 035 036 /** 037 * A temporal rule for use within BAM 038 * 039 * @version $Revision: $ 040 */ 041 public class TemporalRule extends ServiceSupport { 042 private static final transient Log log = LogFactory.getLog(TemporalRule.class); 043 private TimeExpression first; 044 private TimeExpression second; 045 private long expectedMillis; 046 private long overdueMillis; 047 private Processor overdueAction; 048 private ProcessorFactory overdueProcessorFactory; 049 050 public TemporalRule(TimeExpression first, TimeExpression second) { 051 this.first = first; 052 this.second = second; 053 } 054 055 public TemporalRule expectWithin(Time builder) { 056 return expectWithin(builder.toMillis()); 057 } 058 059 public TemporalRule expectWithin(long millis) { 060 expectedMillis = millis; 061 return this; 062 } 063 064 public FromBuilder errorIfOver(Time builder) { 065 return errorIfOver(builder.toMillis()); 066 } 067 068 public FromBuilder errorIfOver(long millis) { 069 overdueMillis = millis; 070 071 FromBuilder builder = new FromBuilder(second.getBuilder().getProcessBuilder(), null); 072 overdueProcessorFactory = builder; 073 return builder; 074 } 075 076 public TimeExpression getFirst() { 077 return first; 078 } 079 080 public TimeExpression getSecond() { 081 return second; 082 } 083 084 public Processor getOverdueAction() throws Exception { 085 if (overdueAction == null && overdueProcessorFactory != null) { 086 overdueAction = overdueProcessorFactory.createProcessor(); 087 } 088 return overdueAction; 089 } 090 091 public void processExchange(Exchange exchange, ProcessInstance instance) { 092 Date firstTime = first.evaluate(instance); 093 if (firstTime == null) { 094 // ignore as first event has not accurred yet 095 return; 096 } 097 098 // TODO now we might need to set the second activity state 099 // to 'grey' to indicate it now could happen? 100 101 // lets force the lazy creation of the second state 102 ActivityState secondState = second.getOrCreateActivityState(instance); 103 if (expectedMillis > 0L) { 104 Date expected = secondState.getTimeExpected(); 105 if (expected == null) { 106 expected = add(firstTime, expectedMillis); 107 secondState.setTimeExpected(expected); 108 } 109 } 110 if (overdueMillis > 0L) { 111 Date overdue = secondState.getTimeOverdue(); 112 if (overdue == null) { 113 overdue = add(firstTime, overdueMillis); 114 secondState.setTimeOverdue(overdue); 115 } 116 } 117 } 118 119 public void processExpired(ActivityState activityState) throws Exception { 120 Processor processor = getOverdueAction(); 121 if (processor != null) { 122 Date now = new Date(); 123 /* 124 TODO this doesn't work and returns null for some strange reason 125 ProcessInstance instance = activityState.getProcessInstance(); 126 ActivityState secondState = second.getActivityState(instance); 127 if (secondState == null) { 128 log.error("Could not find the second state! Process is: " + instance + " with first state: " + first.getActivityState(instance) + " and the state I was called with was: " + activityState); 129 } 130 */ 131 132 ActivityState secondState = activityState; 133 Date overdue = secondState.getTimeOverdue(); 134 if (now.compareTo(overdue) >= 0) { 135 Exchange exchange = createExchange(); 136 exchange.getIn().setBody(activityState); 137 processor.process(exchange); 138 } 139 else { 140 log.warn("Process has not actually expired; the time is: " + now + " but the overdue time is: " + overdue); 141 } 142 } 143 } 144 145 protected Exchange createExchange() { 146 return new DefaultExchange(second.getBuilder().getProcessBuilder().getContext()); 147 } 148 149 /** 150 * Returns the date in the future adding the given number of millis 151 * 152 * @param date 153 * @param millis 154 * @return the date in the future 155 */ 156 protected Date add(Date date, long millis) { 157 return new Date(date.getTime() + millis); 158 } 159 160 protected void doStart() throws Exception { 161 startServices(getOverdueAction()); 162 } 163 164 protected void doStop() throws Exception { 165 stopServices(getOverdueAction()); 166 } 167 }