001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.interceptor;
018    
019    import java.util.Date;
020    import java.util.HashMap;
021    import java.util.Map;
022    
023    import org.apache.camel.Endpoint;
024    import org.apache.camel.Exchange;
025    import org.apache.camel.Processor;
026    import org.apache.camel.Producer;
027    import org.apache.camel.model.ProcessorDefinition;
028    import org.apache.camel.processor.DelegateProcessor;
029    import org.apache.camel.processor.Logger;
030    import org.apache.camel.spi.ExchangeFormatter;
031    import org.apache.camel.spi.InterceptStrategy;
032    import org.apache.camel.spi.TraceableUnitOfWork;
033    import org.apache.camel.util.IntrospectionSupport;
034    import org.apache.camel.util.ObjectHelper;
035    import org.apache.camel.util.ServiceHelper;
036    import org.apache.commons.logging.Log;
037    import org.apache.commons.logging.LogFactory;
038    
039    /**
040     * An interceptor for debugging and tracing routes
041     *
042     * @version $Revision: 767405 $
043     */
044    public class TraceInterceptor extends DelegateProcessor implements ExchangeFormatter {
045        private static final transient Log LOG = LogFactory.getLog(TraceInterceptor.class);
046        private static final String JPA_TRACE_EVENT_MESSAGE = "org.apache.camel.processor.interceptor.JpaTraceEventMessage";
047        private static final String TRACE_EVENT = "CamelTraceEvent";
048        private Logger logger;
049        private Producer traceEventProducer;
050        private final ProcessorDefinition node;
051        private final Tracer tracer;
052        private TraceFormatter formatter;
053        private Class jpaTraceEventMessageClass;
054    
055        public TraceInterceptor(ProcessorDefinition node, Processor target, TraceFormatter formatter, Tracer tracer) {
056            super(target);
057            this.tracer = tracer;
058            this.node = node;
059            this.formatter = formatter;
060    
061            // set logger to use
062            if (tracer.getLogName() != null) {
063                logger = new Logger(LogFactory.getLog(tracer.getLogName()), this);
064            } else {
065                // use default logger
066                logger = new Logger(LogFactory.getLog(TraceInterceptor.class), this);
067            }
068    
069            // set logging level if provided
070            if (tracer.getLogLevel() != null) {
071                logger.setLevel(tracer.getLogLevel());
072            }
073    
074            if (tracer.getFormatter() != null) {
075                this.formatter = tracer.getFormatter();
076            }
077        }
078    
079        public TraceInterceptor(ProcessorDefinition node, Processor target, Tracer tracer) {
080            this(node, target, null, tracer);
081        }
082    
083        @Override
084        public String toString() {
085            return "TraceInterceptor[" + node + "]";
086        }
087    
088        public void process(final Exchange exchange) throws Exception {
089            // interceptor will also trace routes supposed only for TraceEvents so we need to skip
090            // logging TraceEvents to avoid infinite looping
091            if (exchange instanceof TraceEventExchange || exchange.getProperty(TRACE_EVENT, Boolean.class) != null) {
092                // but we must still process to allow routing of TraceEvents to eg a JPA endpoint
093                super.process(exchange);
094                return;
095            }
096    
097            boolean shouldLog = shouldLogNode(node) && shouldLogExchange(exchange);
098    
099            // okay this is a regular exchange being routed we might need to log and trace
100            try {
101                // before
102                if (shouldLog) {
103                    logExchange(exchange);
104                    traceExchange(exchange);
105    
106                    // if traceable then register this as the previous node, now it has been logged
107                    if (exchange.getUnitOfWork() instanceof TraceableUnitOfWork) {
108                        TraceableUnitOfWork tuow = (TraceableUnitOfWork) exchange.getUnitOfWork();
109                        tuow.addInterceptedNode(node);
110                    }
111                }
112    
113                // process the exchange
114                super.proceed(exchange);
115    
116                // after (trace out)
117                if (shouldLog && tracer.isTraceOutExchanges()) {
118                    logExchange(exchange);
119                    traceExchange(exchange);
120                }
121            } catch (Exception e) {
122                if (shouldLogException(exchange)) {
123                    logException(exchange, e);
124                }
125                throw e;
126            }
127        }
128    
129        public Object format(Exchange exchange) {
130            return formatter.format(this, this.getNode(), exchange);
131        }
132    
133        // Properties
134        //-------------------------------------------------------------------------
135        public ProcessorDefinition getNode() {
136            return node;
137        }
138    
139        public Logger getLogger() {
140            return logger;
141        }
142    
143        public TraceFormatter getFormatter() {
144            return formatter;
145        }
146    
147        // Implementation methods
148        //-------------------------------------------------------------------------
149        protected void logExchange(Exchange exchange) {
150            // process the exchange that formats and logs it
151            logger.process(exchange);
152        }
153    
154        @SuppressWarnings("unchecked")
155        protected void traceExchange(Exchange exchange) throws Exception {
156            // should we send a trace event to an optional destination?
157            if (tracer.getDestination() != null || tracer.getDestinationUri() != null) {
158                // create event and add it as a property on the original exchange
159                TraceEventExchange event = new TraceEventExchange(exchange);
160                Date timestamp = new Date();
161                event.setNodeId(node.getId());
162                event.setTimestamp(timestamp);
163                event.setTracedExchange(exchange);
164    
165                // create event message to send in body
166                TraceEventMessage msg = new DefaultTraceEventMessage(timestamp, node, exchange);
167    
168                // should we use ordinary or jpa objects
169                if (tracer.isUseJpa()) {
170                    LOG.trace("Using class: " + JPA_TRACE_EVENT_MESSAGE + " for tracing event messages");
171    
172                    // load the jpa event class
173                    synchronized (this) {
174                        if (jpaTraceEventMessageClass == null) {
175                            jpaTraceEventMessageClass = exchange.getContext().getClassResolver().resolveClass(JPA_TRACE_EVENT_MESSAGE);
176                            if (jpaTraceEventMessageClass == null) {
177                                throw new IllegalArgumentException("Cannot find class: " + JPA_TRACE_EVENT_MESSAGE
178                                        + ". Make sure camel-jpa.jar is in the classpath.");
179                            }
180                        }
181                    }
182    
183                    Object jpa = ObjectHelper.newInstance(jpaTraceEventMessageClass);
184    
185                    // copy options from event to jpa
186                    Map options = new HashMap();
187                    IntrospectionSupport.getProperties(msg, options, null);
188                    IntrospectionSupport.setProperties(jpa, options);
189                    // and set the timestamp as its not a String type
190                    IntrospectionSupport.setProperty(jpa, "timestamp", msg.getTimestamp());
191    
192                    event.getIn().setBody(jpa);
193                } else {
194                    event.getIn().setBody(msg);
195                }
196    
197                // marker property to indicate its a tracing event being routed in case
198                // new Exchange instances is created during trace routing so we can check
199                // for this marker when interceptor also kickins in during routing of trace events
200                event.setProperty(TRACE_EVENT, Boolean.TRUE);
201                try {
202                    // process the trace route
203                    getTraceEventProducer(exchange).process(event);
204                } catch (Exception e) {
205                    // log and ignore this as the original Exchange should be allowed to continue
206                    LOG.error("Error processing TraceEventExchange (original Exchange will be continued): " + event, e);
207                }
208            }
209        }
210    
211        protected void logException(Exchange exchange, Throwable throwable) {
212            if (tracer.isTraceExceptions()) {
213                logger.process(exchange, throwable);
214            }
215        }
216    
217        /**
218         * Returns true if the given exchange should be logged in the trace list
219         */
220        protected boolean shouldLogExchange(Exchange exchange) {
221            return tracer.isEnabled() && (tracer.getTraceFilter() == null || tracer.getTraceFilter().matches(exchange));
222        }
223    
224        /**
225         * Returns true if the given exchange should be logged when an exception was thrown
226         */
227        protected boolean shouldLogException(Exchange exchange) {
228            return tracer.isTraceExceptions();
229        }
230    
231        /**
232         * Returns whether exchanges coming out of processors should be traced
233         */
234        public boolean shouldTraceOutExchanges() {
235            return tracer.isTraceOutExchanges();
236        }
237    
238        /**
239         * Returns true if the given node should be logged in the trace list
240         */
241        protected boolean shouldLogNode(ProcessorDefinition node) {
242            if (node == null) {
243                return false;
244            }
245            if (!tracer.isTraceInterceptors() && (node instanceof InterceptStrategy)) {
246                return false;
247            }
248            return true;
249        }
250    
251        private synchronized Producer getTraceEventProducer(Exchange exchange) throws Exception {
252            if (traceEventProducer == null) {
253                // create producer when we have access the the camel context (we dont in doStart)
254                Endpoint endpoint = tracer.getDestination() != null ? tracer.getDestination() : exchange.getContext().getEndpoint(tracer.getDestinationUri());
255                traceEventProducer = endpoint.createProducer();
256                ServiceHelper.startService(traceEventProducer);
257            }
258            return traceEventProducer;
259        }
260    
261        @Override
262        protected void doStart() throws Exception {
263            super.doStart();
264            traceEventProducer = null;
265        }
266    
267        @Override
268        protected void doStop() throws Exception {
269            super.doStop();
270            if (traceEventProducer != null) {
271                ServiceHelper.stopService(traceEventProducer);
272            }
273        }
274    
275    }