001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.interceptor; 018 019 import java.util.Date; 020 import java.util.HashMap; 021 import java.util.Map; 022 023 import org.apache.camel.Endpoint; 024 import org.apache.camel.Exchange; 025 import org.apache.camel.Processor; 026 import org.apache.camel.Producer; 027 import org.apache.camel.model.InterceptorDefinition; 028 import org.apache.camel.model.ProcessorDefinition; 029 import org.apache.camel.processor.DelegateProcessor; 030 import org.apache.camel.processor.Logger; 031 import org.apache.camel.spi.ExchangeFormatter; 032 import org.apache.camel.spi.InterceptStrategy; 033 import org.apache.camel.spi.TraceableUnitOfWork; 034 import org.apache.camel.util.IntrospectionSupport; 035 import org.apache.camel.util.ObjectHelper; 036 import org.apache.camel.util.ServiceHelper; 037 import org.apache.commons.logging.Log; 038 import org.apache.commons.logging.LogFactory; 039 040 /** 041 * An interceptor for debugging and tracing routes 042 * 043 * @version $Revision: 751357 $ 044 */ 045 public class TraceInterceptor extends DelegateProcessor implements ExchangeFormatter { 046 private static final transient Log LOG = LogFactory.getLog(TraceInterceptor.class); 047 private static final String JPA_TRACE_EVENT_MESSAGE = "org.apache.camel.processor.interceptor.JpaTraceEventMessage"; 048 private static final String TRACE_EVENT = "CamelTraceEvent"; 049 private Logger logger; 050 private Producer traceEventProducer; 051 private final ProcessorDefinition node; 052 private final Tracer tracer; 053 private TraceFormatter formatter; 054 private Class jpaTraceEventMessageClass; 055 056 public TraceInterceptor(ProcessorDefinition node, Processor target, TraceFormatter formatter, Tracer tracer) { 057 super(target); 058 this.tracer = tracer; 059 this.node = node; 060 this.formatter = formatter; 061 062 // set logger to use 063 if (tracer.getLogName() != null) { 064 logger = new Logger(LogFactory.getLog(tracer.getLogName()), this); 065 } else { 066 // use default logger 067 logger = new Logger(LogFactory.getLog(TraceInterceptor.class), this); 068 } 069 070 // set logging level if provided 071 if (tracer.getLogLevel() != null) { 072 logger.setLevel(tracer.getLogLevel()); 073 } 074 075 if (tracer.getFormatter() != null) { 076 this.formatter = tracer.getFormatter(); 077 } 078 } 079 080 public TraceInterceptor(ProcessorDefinition node, Processor target, Tracer tracer) { 081 this(node, target, null, tracer); 082 } 083 084 @Override 085 public String toString() { 086 return "TraceInterceptor[" + node + "]"; 087 } 088 089 public void process(final Exchange exchange) throws Exception { 090 // interceptor will also trace routes supposed only for TraceEvents so we need to skip 091 // logging TraceEvents to avoid infinite looping 092 if (exchange instanceof TraceEventExchange || exchange.getProperty(TRACE_EVENT, Boolean.class) != null) { 093 // but we must still process to allow routing of TraceEvents to eg a JPA endpoint 094 super.process(exchange); 095 return; 096 } 097 098 boolean shouldLog = shouldLogNode(node) && shouldLogExchange(exchange); 099 100 // okay this is a regular exchange being routed we might need to log and trace 101 try { 102 // before 103 if (shouldLog) { 104 logExchange(exchange); 105 traceExchange(exchange); 106 107 // if traceable then register this as the previous node, now it has been logged 108 if (exchange.getUnitOfWork() instanceof TraceableUnitOfWork) { 109 TraceableUnitOfWork tuow = (TraceableUnitOfWork) exchange.getUnitOfWork(); 110 tuow.addInterceptedNode(node); 111 } 112 } 113 114 // process the exchange 115 super.proceed(exchange); 116 117 // after (trace out) 118 if (shouldLog && tracer.isTraceOutExchanges()) { 119 logExchange(exchange); 120 traceExchange(exchange); 121 } 122 } catch (Exception e) { 123 if (shouldLogException(exchange)) { 124 logException(exchange, e); 125 } 126 throw e; 127 } 128 } 129 130 public Object format(Exchange exchange) { 131 return formatter.format(this, this.getNode(), exchange); 132 } 133 134 // Properties 135 //------------------------------------------------------------------------- 136 public ProcessorDefinition getNode() { 137 return node; 138 } 139 140 public Logger getLogger() { 141 return logger; 142 } 143 144 public TraceFormatter getFormatter() { 145 return formatter; 146 } 147 148 // Implementation methods 149 //------------------------------------------------------------------------- 150 protected void logExchange(Exchange exchange) { 151 // process the exchange that formats and logs it 152 logger.process(exchange); 153 } 154 155 @SuppressWarnings("unchecked") 156 protected void traceExchange(Exchange exchange) throws Exception { 157 // should we send a trace event to an optional destination? 158 if (tracer.getDestination() != null || tracer.getDestinationUri() != null) { 159 // create event and add it as a property on the original exchange 160 TraceEventExchange event = new TraceEventExchange(exchange); 161 Date timestamp = new Date(); 162 event.setNodeId(node.getId()); 163 event.setTimestamp(timestamp); 164 event.setTracedExchange(exchange); 165 166 // create event message to send in body 167 TraceEventMessage msg = new DefaultTraceEventMessage(timestamp, node, exchange); 168 169 // should we use ordinary or jpa objects 170 if (tracer.isUseJpa()) { 171 LOG.trace("Using class: " + JPA_TRACE_EVENT_MESSAGE + " for tracing event messages"); 172 173 // load the jpa event class 174 synchronized (this) { 175 if (jpaTraceEventMessageClass == null) { 176 jpaTraceEventMessageClass = exchange.getContext().getClassResolver().resolveClass(JPA_TRACE_EVENT_MESSAGE); 177 if (jpaTraceEventMessageClass == null) { 178 throw new IllegalArgumentException("Cannot find class: " + JPA_TRACE_EVENT_MESSAGE 179 + ". Make sure camel-jpa.jar is in the classpath."); 180 } 181 } 182 } 183 184 Object jpa = ObjectHelper.newInstance(jpaTraceEventMessageClass); 185 186 // copy options from event to jpa 187 Map options = new HashMap(); 188 IntrospectionSupport.getProperties(msg, options, null); 189 IntrospectionSupport.setProperties(jpa, options); 190 // and set the timestamp as its not a String type 191 IntrospectionSupport.setProperty(jpa, "timestamp", msg.getTimestamp()); 192 193 event.getIn().setBody(jpa); 194 } else { 195 event.getIn().setBody(msg); 196 } 197 198 // marker property to indicate its a tracing event being routed in case 199 // new Exchange instances is created during trace routing so we can check 200 // for this marker when interceptor also kickins in during routing of trace events 201 event.setProperty(TRACE_EVENT, Boolean.TRUE); 202 try { 203 // process the trace route 204 getTraceEventProducer(exchange).process(event); 205 } catch (Exception e) { 206 // log and ignore this as the original Exchange should be allowed to continue 207 LOG.error("Error processing TraceEventExchange (original Exchange will be continued): " + event, e); 208 } 209 } 210 } 211 212 protected void logException(Exchange exchange, Throwable throwable) { 213 if (tracer.isTraceExceptions()) { 214 logger.process(exchange, throwable); 215 } 216 } 217 218 /** 219 * Returns true if the given exchange should be logged in the trace list 220 */ 221 protected boolean shouldLogExchange(Exchange exchange) { 222 return tracer.isEnabled() && (tracer.getTraceFilter() == null || tracer.getTraceFilter().matches(exchange)); 223 } 224 225 /** 226 * Returns true if the given exchange should be logged when an exception was thrown 227 */ 228 protected boolean shouldLogException(Exchange exchange) { 229 return tracer.isTraceExceptions(); 230 } 231 232 /** 233 * Returns whether exchanges coming out of processors should be traced 234 */ 235 public boolean shouldTraceOutExchanges() { 236 return tracer.isTraceOutExchanges(); 237 } 238 239 /** 240 * Returns true if the given node should be logged in the trace list 241 */ 242 protected boolean shouldLogNode(ProcessorDefinition node) { 243 if (node == null) { 244 return false; 245 } 246 if (!tracer.isTraceInterceptors() && (node instanceof InterceptStrategy || node instanceof InterceptorDefinition)) { 247 return false; 248 } 249 return true; 250 } 251 252 private synchronized Producer getTraceEventProducer(Exchange exchange) throws Exception { 253 if (traceEventProducer == null) { 254 // create producer when we have access the the camel context (we dont in doStart) 255 Endpoint endpoint = tracer.getDestination() != null ? tracer.getDestination() : exchange.getContext().getEndpoint(tracer.getDestinationUri()); 256 traceEventProducer = endpoint.createProducer(); 257 ServiceHelper.startService(traceEventProducer); 258 } 259 return traceEventProducer; 260 } 261 262 @Override 263 protected void doStart() throws Exception { 264 super.doStart(); 265 traceEventProducer = null; 266 } 267 268 @Override 269 protected void doStop() throws Exception { 270 super.doStop(); 271 if (traceEventProducer != null) { 272 ServiceHelper.stopService(traceEventProducer); 273 } 274 } 275 276 }