001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.interceptor; 018 019 import java.util.Date; 020 import java.util.HashMap; 021 import java.util.Map; 022 023 import org.apache.camel.Endpoint; 024 import org.apache.camel.Exchange; 025 import org.apache.camel.Processor; 026 import org.apache.camel.Producer; 027 import org.apache.camel.model.ProcessorDefinition; 028 import org.apache.camel.processor.DelegateProcessor; 029 import org.apache.camel.processor.Logger; 030 import org.apache.camel.spi.ExchangeFormatter; 031 import org.apache.camel.spi.InterceptStrategy; 032 import org.apache.camel.spi.TraceableUnitOfWork; 033 import org.apache.camel.util.IntrospectionSupport; 034 import org.apache.camel.util.ObjectHelper; 035 import org.apache.camel.util.ServiceHelper; 036 import org.apache.commons.logging.Log; 037 import org.apache.commons.logging.LogFactory; 038 039 /** 040 * An interceptor for debugging and tracing routes 041 * 042 * @version $Revision: 767405 $ 043 */ 044 public class TraceInterceptor extends DelegateProcessor implements ExchangeFormatter { 045 private static final transient Log LOG = LogFactory.getLog(TraceInterceptor.class); 046 private static final String JPA_TRACE_EVENT_MESSAGE = "org.apache.camel.processor.interceptor.JpaTraceEventMessage"; 047 private static final String TRACE_EVENT = "CamelTraceEvent"; 048 private Logger logger; 049 private Producer traceEventProducer; 050 private final ProcessorDefinition node; 051 private final Tracer tracer; 052 private TraceFormatter formatter; 053 private Class jpaTraceEventMessageClass; 054 055 public TraceInterceptor(ProcessorDefinition node, Processor target, TraceFormatter formatter, Tracer tracer) { 056 super(target); 057 this.tracer = tracer; 058 this.node = node; 059 this.formatter = formatter; 060 061 // set logger to use 062 if (tracer.getLogName() != null) { 063 logger = new Logger(LogFactory.getLog(tracer.getLogName()), this); 064 } else { 065 // use default logger 066 logger = new Logger(LogFactory.getLog(TraceInterceptor.class), this); 067 } 068 069 // set logging level if provided 070 if (tracer.getLogLevel() != null) { 071 logger.setLevel(tracer.getLogLevel()); 072 } 073 074 if (tracer.getFormatter() != null) { 075 this.formatter = tracer.getFormatter(); 076 } 077 } 078 079 public TraceInterceptor(ProcessorDefinition node, Processor target, Tracer tracer) { 080 this(node, target, null, tracer); 081 } 082 083 @Override 084 public String toString() { 085 return "TraceInterceptor[" + node + "]"; 086 } 087 088 public void process(final Exchange exchange) throws Exception { 089 // interceptor will also trace routes supposed only for TraceEvents so we need to skip 090 // logging TraceEvents to avoid infinite looping 091 if (exchange instanceof TraceEventExchange || exchange.getProperty(TRACE_EVENT, Boolean.class) != null) { 092 // but we must still process to allow routing of TraceEvents to eg a JPA endpoint 093 super.process(exchange); 094 return; 095 } 096 097 boolean shouldLog = shouldLogNode(node) && shouldLogExchange(exchange); 098 099 // okay this is a regular exchange being routed we might need to log and trace 100 try { 101 // before 102 if (shouldLog) { 103 logExchange(exchange); 104 traceExchange(exchange); 105 106 // if traceable then register this as the previous node, now it has been logged 107 if (exchange.getUnitOfWork() instanceof TraceableUnitOfWork) { 108 TraceableUnitOfWork tuow = (TraceableUnitOfWork) exchange.getUnitOfWork(); 109 tuow.addInterceptedNode(node); 110 } 111 } 112 113 // process the exchange 114 super.proceed(exchange); 115 116 // after (trace out) 117 if (shouldLog && tracer.isTraceOutExchanges()) { 118 logExchange(exchange); 119 traceExchange(exchange); 120 } 121 } catch (Exception e) { 122 if (shouldLogException(exchange)) { 123 logException(exchange, e); 124 } 125 throw e; 126 } 127 } 128 129 public Object format(Exchange exchange) { 130 return formatter.format(this, this.getNode(), exchange); 131 } 132 133 // Properties 134 //------------------------------------------------------------------------- 135 public ProcessorDefinition getNode() { 136 return node; 137 } 138 139 public Logger getLogger() { 140 return logger; 141 } 142 143 public TraceFormatter getFormatter() { 144 return formatter; 145 } 146 147 // Implementation methods 148 //------------------------------------------------------------------------- 149 protected void logExchange(Exchange exchange) { 150 // process the exchange that formats and logs it 151 logger.process(exchange); 152 } 153 154 @SuppressWarnings("unchecked") 155 protected void traceExchange(Exchange exchange) throws Exception { 156 // should we send a trace event to an optional destination? 157 if (tracer.getDestination() != null || tracer.getDestinationUri() != null) { 158 // create event and add it as a property on the original exchange 159 TraceEventExchange event = new TraceEventExchange(exchange); 160 Date timestamp = new Date(); 161 event.setNodeId(node.getId()); 162 event.setTimestamp(timestamp); 163 event.setTracedExchange(exchange); 164 165 // create event message to send in body 166 TraceEventMessage msg = new DefaultTraceEventMessage(timestamp, node, exchange); 167 168 // should we use ordinary or jpa objects 169 if (tracer.isUseJpa()) { 170 LOG.trace("Using class: " + JPA_TRACE_EVENT_MESSAGE + " for tracing event messages"); 171 172 // load the jpa event class 173 synchronized (this) { 174 if (jpaTraceEventMessageClass == null) { 175 jpaTraceEventMessageClass = exchange.getContext().getClassResolver().resolveClass(JPA_TRACE_EVENT_MESSAGE); 176 if (jpaTraceEventMessageClass == null) { 177 throw new IllegalArgumentException("Cannot find class: " + JPA_TRACE_EVENT_MESSAGE 178 + ". Make sure camel-jpa.jar is in the classpath."); 179 } 180 } 181 } 182 183 Object jpa = ObjectHelper.newInstance(jpaTraceEventMessageClass); 184 185 // copy options from event to jpa 186 Map options = new HashMap(); 187 IntrospectionSupport.getProperties(msg, options, null); 188 IntrospectionSupport.setProperties(jpa, options); 189 // and set the timestamp as its not a String type 190 IntrospectionSupport.setProperty(jpa, "timestamp", msg.getTimestamp()); 191 192 event.getIn().setBody(jpa); 193 } else { 194 event.getIn().setBody(msg); 195 } 196 197 // marker property to indicate its a tracing event being routed in case 198 // new Exchange instances is created during trace routing so we can check 199 // for this marker when interceptor also kickins in during routing of trace events 200 event.setProperty(TRACE_EVENT, Boolean.TRUE); 201 try { 202 // process the trace route 203 getTraceEventProducer(exchange).process(event); 204 } catch (Exception e) { 205 // log and ignore this as the original Exchange should be allowed to continue 206 LOG.error("Error processing TraceEventExchange (original Exchange will be continued): " + event, e); 207 } 208 } 209 } 210 211 protected void logException(Exchange exchange, Throwable throwable) { 212 if (tracer.isTraceExceptions()) { 213 logger.process(exchange, throwable); 214 } 215 } 216 217 /** 218 * Returns true if the given exchange should be logged in the trace list 219 */ 220 protected boolean shouldLogExchange(Exchange exchange) { 221 return tracer.isEnabled() && (tracer.getTraceFilter() == null || tracer.getTraceFilter().matches(exchange)); 222 } 223 224 /** 225 * Returns true if the given exchange should be logged when an exception was thrown 226 */ 227 protected boolean shouldLogException(Exchange exchange) { 228 return tracer.isTraceExceptions(); 229 } 230 231 /** 232 * Returns whether exchanges coming out of processors should be traced 233 */ 234 public boolean shouldTraceOutExchanges() { 235 return tracer.isTraceOutExchanges(); 236 } 237 238 /** 239 * Returns true if the given node should be logged in the trace list 240 */ 241 protected boolean shouldLogNode(ProcessorDefinition node) { 242 if (node == null) { 243 return false; 244 } 245 if (!tracer.isTraceInterceptors() && (node instanceof InterceptStrategy)) { 246 return false; 247 } 248 return true; 249 } 250 251 private synchronized Producer getTraceEventProducer(Exchange exchange) throws Exception { 252 if (traceEventProducer == null) { 253 // create producer when we have access the the camel context (we dont in doStart) 254 Endpoint endpoint = tracer.getDestination() != null ? tracer.getDestination() : exchange.getContext().getEndpoint(tracer.getDestinationUri()); 255 traceEventProducer = endpoint.createProducer(); 256 ServiceHelper.startService(traceEventProducer); 257 } 258 return traceEventProducer; 259 } 260 261 @Override 262 protected void doStart() throws Exception { 263 super.doStart(); 264 traceEventProducer = null; 265 } 266 267 @Override 268 protected void doStop() throws Exception { 269 super.doStop(); 270 if (traceEventProducer != null) { 271 ServiceHelper.stopService(traceEventProducer); 272 } 273 } 274 275 }