001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.List; 022 import java.util.Map; 023 024 import org.apache.camel.AsyncProcessor; 025 import org.apache.camel.CamelContext; 026 import org.apache.camel.Endpoint; 027 import org.apache.camel.Intercept; 028 import org.apache.camel.NoSuchEndpointException; 029 import org.apache.camel.Processor; 030 import org.apache.camel.Route; 031 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; 032 import org.apache.camel.model.FromDefinition; 033 import org.apache.camel.model.ProcessorDefinition; 034 import org.apache.camel.model.RouteDefinition; 035 import org.apache.camel.model.dataformat.DataFormatDefinition; 036 import org.apache.camel.processor.Interceptor; 037 import org.apache.camel.processor.Pipeline; 038 import org.apache.camel.processor.ProceedProcessor; 039 import org.apache.camel.processor.UnitOfWorkProcessor; 040 import org.apache.camel.spi.ErrorHandlerWrappingStrategy; 041 import org.apache.camel.spi.InterceptStrategy; 042 import org.apache.camel.spi.RouteContext; 043 044 /** 045 * The context used to activate new routing rules 046 * 047 * @version $Revision: 751221 $ 048 */ 049 public class DefaultRouteContext implements RouteContext { 050 private final RouteDefinition route; 051 private FromDefinition from; 052 private final Collection<Route> routes; 053 private Endpoint endpoint; 054 private final List<Processor> eventDrivenProcessors = new ArrayList<Processor>(); 055 private Interceptor lastInterceptor; 056 private CamelContext camelContext; 057 private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>(); 058 private ErrorHandlerWrappingStrategy errorHandlerWrappingStrategy; 059 private boolean routeAdded; 060 061 public DefaultRouteContext(RouteDefinition route, FromDefinition from, Collection<Route> routes) { 062 this.route = route; 063 this.from = from; 064 this.routes = routes; 065 } 066 067 /** 068 * Only used for lazy construction from inside ExpressionType 069 */ 070 public DefaultRouteContext(CamelContext camelContext) { 071 this.camelContext = camelContext; 072 routes = new ArrayList<Route>(); 073 route = new RouteDefinition("temporary"); 074 } 075 076 public Endpoint getEndpoint() { 077 if (endpoint == null) { 078 endpoint = from.resolveEndpoint(this); 079 } 080 return endpoint; 081 } 082 083 public FromDefinition getFrom() { 084 return from; 085 } 086 087 public RouteDefinition getRoute() { 088 return route; 089 } 090 091 public CamelContext getCamelContext() { 092 if (camelContext == null) { 093 camelContext = getRoute().getCamelContext(); 094 } 095 return camelContext; 096 } 097 098 public Processor createProcessor(ProcessorDefinition node) throws Exception { 099 return node.createOutputsProcessor(this); 100 } 101 102 public Endpoint resolveEndpoint(String uri) { 103 return route.resolveEndpoint(uri); 104 } 105 106 public Endpoint resolveEndpoint(String uri, String ref) { 107 Endpoint endpoint = null; 108 if (uri != null) { 109 endpoint = resolveEndpoint(uri); 110 if (endpoint == null) { 111 throw new NoSuchEndpointException(uri); 112 } 113 } 114 if (ref != null) { 115 endpoint = lookup(ref, Endpoint.class); 116 if (endpoint == null) { 117 throw new NoSuchEndpointException("ref:" + ref); 118 } 119 } 120 if (endpoint == null) { 121 throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified on: " + this); 122 } else { 123 return endpoint; 124 } 125 } 126 127 public <T> T lookup(String name, Class<T> type) { 128 return getCamelContext().getRegistry().lookup(name, type); 129 } 130 131 @SuppressWarnings("unchecked") 132 public void commit() { 133 // now lets turn all of the event driven consumer processors into a 134 // single route 135 if (!eventDrivenProcessors.isEmpty()) { 136 Processor processor = Pipeline.newInstance(eventDrivenProcessors); 137 138 // lets create the async processor 139 final AsyncProcessor asyncProcessor = AsyncProcessorTypeConverter.convert(processor); 140 Processor unitOfWorkProcessor = new UnitOfWorkProcessor(asyncProcessor); 141 142 // TODO: hz: move all this into the lifecycle strategy! (used by jmx naming strategy) 143 Route edcr = new EventDrivenConsumerRoute(getEndpoint(), unitOfWorkProcessor); 144 edcr.getProperties().put(Route.ID_PROPERTY, route.idOrCreate()); 145 edcr.getProperties().put(Route.PARENT_PROPERTY, Integer.toHexString(route.hashCode())); 146 if (route.getGroup() != null) { 147 edcr.getProperties().put(Route.GROUP_PROPERTY, route.getGroup()); 148 } 149 routes.add(edcr); 150 } 151 } 152 153 public void addEventDrivenProcessor(Processor processor) { 154 eventDrivenProcessors.add(processor); 155 } 156 157 public void intercept(Intercept interceptor) { 158 lastInterceptor = (Interceptor)interceptor; 159 } 160 161 public Processor createProceedProcessor() { 162 if (lastInterceptor == null) { 163 throw new IllegalArgumentException("Cannot proceed() from outside of an interceptor!"); 164 } else { 165 return new ProceedProcessor(lastInterceptor); 166 } 167 } 168 169 public List<InterceptStrategy> getInterceptStrategies() { 170 return interceptStrategies; 171 } 172 173 public void setInterceptStrategies(List<InterceptStrategy> interceptStrategies) { 174 this.interceptStrategies = interceptStrategies; 175 } 176 177 public void addInterceptStrategy(InterceptStrategy interceptStrategy) { 178 getInterceptStrategies().add(interceptStrategy); 179 } 180 181 public ErrorHandlerWrappingStrategy getErrorHandlerWrappingStrategy() { 182 return errorHandlerWrappingStrategy; 183 } 184 185 public void setErrorHandlerWrappingStrategy(ErrorHandlerWrappingStrategy strategy) { 186 errorHandlerWrappingStrategy = strategy; 187 } 188 189 public boolean isRouteAdded() { 190 return routeAdded; 191 } 192 193 public void setIsRouteAdded(boolean routeAdded) { 194 this.routeAdded = routeAdded; 195 } 196 197 public DataFormatDefinition getDataFormat(String ref) { 198 Map<String, DataFormatDefinition> dataFormats = getCamelContext().getDataFormats(); 199 if (dataFormats != null) { 200 return dataFormats.get(ref); 201 } else { 202 return null; 203 } 204 } 205 }