001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    
022    import org.apache.camel.Channel;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.Processor;
025    import org.apache.camel.impl.ServiceSupport;
026    import org.apache.camel.model.ProcessorDefinition;
027    import org.apache.camel.spi.InterceptStrategy;
028    import org.apache.camel.spi.RouteContext;
029    import org.apache.camel.util.ServiceHelper;
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    
033    /**
034     * DefaultChannel is the default {@link Channel}.
035     * <p/>
036     * The current implementation is just a composite containing the interceptors and error handler
037     * that beforehand was added to the route graph directly.
038     * <br/>
039     * With this {@link Channel} we can in the future implement better strategies for routing the
040     * {@link Exchange} in the route graph, as we have a {@link Channel} between each and every node
041     * in the graph.
042     *
043     * @version $Revision: 771248 $
044     */
045    public class DefaultChannel extends ServiceSupport implements Processor, Channel {
046    
047        private static final transient Log LOG = LogFactory.getLog(DefaultChannel.class);
048    
049        private final List<InterceptStrategy> interceptors = new ArrayList<InterceptStrategy>();
050        private Processor errorHandler;
051        // the next processor (non wrapped)
052        private Processor nextProcessor;
053        // the real output to invoke that has been wrapped
054        private Processor output;
055        private ProcessorDefinition definition;
056    
057        public List<Processor> next() {
058            List<Processor> answer = new ArrayList<Processor>(1);
059            answer.add(nextProcessor);
060            return answer;
061        }
062    
063        public boolean hasNext() {
064            return nextProcessor != null;
065        }
066    
067        public void setNextProcessor(Processor next) {
068            this.nextProcessor = next;
069        }
070    
071        public Processor getOutput() {
072            // the errorHandler is already decorated with interceptors
073            // so it cointain the entire chain of processors, so we can safely use it directly as output
074            // if no error handler provided we can use the output direcly
075            return errorHandler != null ? errorHandler : output;
076        }
077    
078        public void setOutput(Processor output) {
079            this.output = output;
080        }
081    
082        public Processor getNextProcessor() {
083            return nextProcessor;
084        }
085    
086        public boolean hasInterceptorStrategy(Class type) {
087            for (InterceptStrategy strategy : interceptors) {
088                if (type.isInstance(strategy)) {
089                    return true;
090                }
091            }
092            return false;
093        }
094    
095        public void setErrorHandler(Processor errorHandler) {
096            this.errorHandler = errorHandler;
097        }
098    
099        public Processor getErrorHandler() {
100            return errorHandler;
101        }
102    
103        public void addInterceptStrategy(InterceptStrategy strategy) {
104            interceptors.add(strategy);
105        }
106    
107        public void addInterceptStrategies(List<InterceptStrategy> strategies) {
108            interceptors.addAll(strategies);
109        }
110    
111        public List<InterceptStrategy> getInterceptStrategies() {
112            return interceptors;
113        }
114    
115        public ProcessorDefinition getProcessorDefinition() {
116            return definition;
117        }
118    
119        @Override
120        protected void doStart() throws Exception {
121            ServiceHelper.startServices(errorHandler, output);
122        }
123    
124        @Override
125        protected void doStop() throws Exception {
126            ServiceHelper.stopServices(output, errorHandler);
127        }
128    
129        public void initChannel(ProcessorDefinition outputDefinition, RouteContext routeContext) throws Exception {
130            this.definition = outputDefinition;
131    
132            // TODO: Support ordering of interceptors
133    
134            // wrap the output with the interceptors
135            Processor target = nextProcessor;
136            for (InterceptStrategy strategy : interceptors) {
137                target = strategy.wrapProcessorInInterceptors(outputDefinition, target, nextProcessor);
138            }
139    
140            // sets the delegate to our wrapped output
141            output = target;
142        }
143    
144        public void process(Exchange exchange) throws Exception {
145            Processor processor = getOutput();
146            if (processor != null && continueProcessing(exchange)) {
147                processor.process(exchange);
148            }
149        }
150    
151        /**
152         * Strategy to determine if we should continue processing the {@link Exchange}.
153         */
154        protected boolean continueProcessing(Exchange exchange) {
155            Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
156            if (stop != null) {
157                boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
158                if (doStop) {
159                    if (LOG.isDebugEnabled()) {
160                        LOG.debug("Exchange is marked to stop routing: " + exchange);
161                    }
162                    return false;
163                }
164            }
165            return true;
166        }
167    
168        @Override
169        public String toString() {
170            // just output the next processor as all the interceptors and error handler is just too verbose
171            return "Channel[" + nextProcessor + "]";
172        }
173    
174    }