001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.management;
018    
019    import java.lang.reflect.Method;
020    import java.util.Collection;
021    import java.util.HashMap;
022    import java.util.Map;
023    import javax.management.JMException;
024    import javax.management.MalformedObjectNameException;
025    import javax.management.ObjectName;
026    
027    import org.apache.camel.CamelContext;
028    import org.apache.camel.Consumer;
029    import org.apache.camel.Endpoint;
030    import org.apache.camel.Processor;
031    import org.apache.camel.Route;
032    import org.apache.camel.Service;
033    import org.apache.camel.impl.DefaultCamelContext;
034    import org.apache.camel.impl.ServiceSupport;
035    import org.apache.camel.model.ProcessorDefinition;
036    import org.apache.camel.model.RouteDefinition;
037    import org.apache.camel.spi.ClassResolver;
038    import org.apache.camel.spi.InstrumentationAgent;
039    import org.apache.camel.spi.InterceptStrategy;
040    import org.apache.camel.spi.LifecycleStrategy;
041    import org.apache.camel.spi.RouteContext;
042    import org.apache.camel.util.ObjectHelper;
043    import org.apache.commons.logging.Log;
044    import org.apache.commons.logging.LogFactory;
045    
046    /**
047     * JMX agent that registeres Camel lifecycle events in JMX.
048     *
049     * @version $Revision: 770908 $
050     */
051    public class InstrumentationLifecycleStrategy implements LifecycleStrategy {
052        private static final transient Log LOG = LogFactory.getLog(InstrumentationProcessor.class);
053    
054        private static final String MANAGED_RESOURCE_CLASSNAME = "org.springframework.jmx.export.annotation.ManagedResource";
055        private InstrumentationAgent agent;
056        private CamelNamingStrategy namingStrategy;
057        private boolean initialized;
058        private final Map<Endpoint, InstrumentationProcessor> registeredRoutes = new HashMap<Endpoint, InstrumentationProcessor>();
059    
060        public InstrumentationLifecycleStrategy() {
061            this(new DefaultInstrumentationAgent());
062        }
063    
064        public InstrumentationLifecycleStrategy(InstrumentationAgent agent) {
065            this.agent = agent;
066        }
067        /**
068         * Constructor for camel context that has been started.
069         *
070         * @param agent    the agent
071         * @param context  the camel context
072         */
073        public InstrumentationLifecycleStrategy(InstrumentationAgent agent, CamelContext context) {
074            this.agent = agent;
075            onContextStart(context);
076        }
077    
078        public void onContextStart(CamelContext context) {
079            // register camel context
080            if (context instanceof DefaultCamelContext) {
081                try {
082                    initialized = true;
083                    DefaultCamelContext dc = (DefaultCamelContext)context;
084                    // call addService so that context will start and stop the agent
085                    dc.addService(agent);
086                    namingStrategy = new CamelNamingStrategy(agent.getMBeanObjectDomainName());
087                    ManagedService ms = new ManagedService(dc);
088                    agent.register(ms, getNamingStrategy().getObjectName(dc));
089                } catch (Exception e) {
090                    // must rethrow to allow CamelContext fallback to non JMX agent to allow
091                    // Camel to continue to run
092                    throw ObjectHelper.wrapRuntimeCamelException(e);
093                }
094            }
095        }
096    
097        /**
098         * If the endpoint is an instance of ManagedResource then register it with the
099         * mbean server, if it is not then wrap the endpoint in a {@link ManagedEndpoint} and
100         * register that with the mbean server.
101         * @param endpoint the Endpoint attempted to be added
102         */
103        @SuppressWarnings("unchecked")
104        public void onEndpointAdd(Endpoint endpoint) {
105            // the agent hasn't been started
106            if (!initialized) {
107                return;
108            }
109    
110            // see if the spring-jmx is on the classpath
111            Class annotationClass = resolveManagedAnnotation(endpoint);
112            if (annotationClass == null) {
113                // no its not so register the endpoint as a new managed endpoint
114                registerEndpointAsManagedEndpoint(endpoint);
115                return;
116            }
117    
118            // see if the endpoint have been annotation with a spring JMX annotation
119            Object annotation = endpoint.getClass().getAnnotation(annotationClass);
120            if (annotation == null) {
121                // no its not so register the endpoint as a new managed endpoint
122                registerEndpointAsManagedEndpoint(endpoint);
123            } else {
124                // there is already a spring JMX annotation so attempt to register it
125                attemptToRegisterManagedResource(endpoint, annotation);
126            }
127        }
128    
129        private Class resolveManagedAnnotation(Endpoint endpoint) {
130            CamelContext context = endpoint.getCamelContext();
131    
132            ClassResolver resolver = context.getClassResolver();
133            return resolver.resolveClass(MANAGED_RESOURCE_CLASSNAME);
134        }
135    
136        private void attemptToRegisterManagedResource(Endpoint endpoint, Object annotation) {
137            try {
138                Method m = annotation.getClass().getMethod("objectName");
139    
140                String objectNameStr = (String) m.invoke(annotation);
141    
142                ObjectName objectName = new ObjectName(objectNameStr);
143                agent.register(endpoint, objectName);
144            } catch (Exception e) {
145                LOG.debug("objectName method not present, wrapping endpoint in ManagedEndpoint instead");
146                registerEndpointAsManagedEndpoint(endpoint);
147            }
148        }
149    
150        private void registerEndpointAsManagedEndpoint(Endpoint endpoint) {
151            try {
152                ManagedEndpoint me = new ManagedEndpoint(endpoint);
153                agent.register(me, getNamingStrategy().getObjectName(me));
154            } catch (JMException e) {
155                LOG.warn("Could not register Endpoint MBean for uri: " + endpoint.getEndpointUri(), e);
156            }
157        }
158    
159        @SuppressWarnings("unchecked")
160        public void onRoutesAdd(Collection<Route> routes) {
161            // the agent hasn't been started
162            if (!initialized) {
163                return;
164            }
165    
166            for (Route route : routes) {
167                try {
168                    ManagedRoute mr = new ManagedRoute(route);
169                    // retrieve the per-route intercept for this route
170                    InstrumentationProcessor processor = registeredRoutes.get(route.getEndpoint());
171                    if (processor == null) {
172                        LOG.warn("Route has not been instrumented for endpoint: " + route.getEndpoint());
173                    } else {
174                        // let the instrumentation use our route counter
175                        processor.setCounter(mr);
176                    }
177                    agent.register(mr, getNamingStrategy().getObjectName(mr));
178                } catch (JMException e) {
179                    LOG.warn("Could not register Route MBean", e);
180                }
181            }
182        }
183    
184        public void onServiceAdd(CamelContext context, Service service) {
185            // the agent hasn't been started
186            if (!initialized) {
187                return;
188            }
189    
190            // register consumer
191            if (service instanceof ServiceSupport && service instanceof Consumer) {
192                // TODO: add support for non-consumer services?
193                try {
194                    ManagedService ms = new ManagedService((ServiceSupport)service);
195                    agent.register(ms, getNamingStrategy().getObjectName(context, ms));
196                } catch (JMException e) {
197                    LOG.warn("Could not register Service MBean", e);
198                }
199            }
200        }
201    
202        public void onRouteContextCreate(RouteContext routeContext) {
203            // the agent hasn't been started
204            if (!initialized) {
205                return;
206            }
207    
208            // Create a map (ProcessorType -> PerformanceCounter)
209            // to be passed to InstrumentationInterceptStrategy.
210            Map<ProcessorDefinition, PerformanceCounter> registeredCounters =
211                new HashMap<ProcessorDefinition, PerformanceCounter>();
212    
213            // Each processor in a route will have its own performance counter
214            // The performance counter are MBeans that we register with MBeanServer.
215            // These performance counter will be embedded
216            // to InstrumentationProcessor and wrap the appropriate processor
217            // by InstrumentationInterceptStrategy.
218            RouteDefinition route = routeContext.getRoute();
219    
220            // TODO: This only registers counters for the first outputs in the route
221            // all the chidren of the outputs is not registered
222            // we should leverge the Channel for this to ensure we register all processors
223            // in the entire route graph
224    
225            // register all processors
226            for (ProcessorDefinition processor : route.getOutputs()) {
227                // skip processors that should not be registered
228                if (!registerProcessor(processor)) {
229                    continue;
230                }
231    
232                ObjectName name = null;
233                try {
234                    // get the mbean name
235                    name = getNamingStrategy().getObjectName(routeContext, processor);
236    
237                    // register mbean wrapped in the performance counter mbean
238                    PerformanceCounter pc = new PerformanceCounter();
239                    agent.register(pc, name);
240    
241                    // add to map now that it has been registered
242                    registeredCounters.put(processor, pc);
243                } catch (MalformedObjectNameException e) {
244                    LOG.warn("Could not create MBean name: " + name, e);
245                } catch (JMException e) {
246                    LOG.warn("Could not register PerformanceCounter MBean: " + name, e);
247                }
248            }
249    
250            // add intercept strategy that executes the JMX instrumentation for performance metrics
251            routeContext.addInterceptStrategy(new InstrumentationInterceptStrategy(registeredCounters));
252    
253            // instrument the route endpoint
254            final Endpoint endpoint = routeContext.getEndpoint();
255    
256            // only needed to register on the first output as all rotues will pass through this one
257            ProcessorDefinition out = routeContext.getRoute().getOutputs().get(0);
258    
259            // add an intercept strategy that counts when the route sends to any of its outputs
260            out.addInterceptStrategy(new InterceptStrategy() {
261                public Processor wrapProcessorInInterceptors(ProcessorDefinition processorDefinition, Processor target, Processor nextTarget) throws Exception {
262                    if (registeredRoutes.containsKey(endpoint)) {
263                        // do not double wrap
264                        return target;
265                    }
266                    InstrumentationProcessor wrapper = new InstrumentationProcessor(null);
267                    wrapper.setType(processorDefinition.getShortName());
268                    wrapper.setProcessor(target);
269    
270                    // register our wrapper
271                    registeredRoutes.put(endpoint, wrapper);
272    
273                    return wrapper;
274                }
275            });
276    
277        }
278    
279        /**
280         * Should the given processor be registered.
281         */
282        protected boolean registerProcessor(ProcessorDefinition processor) {
283            if (agent instanceof DefaultInstrumentationAgent) {
284                DefaultInstrumentationAgent dia = (DefaultInstrumentationAgent) agent;
285                if (dia.getOnlyRegisterProcessorWithCustomId() != null && dia.getOnlyRegisterProcessorWithCustomId()) {
286                    // only register if the processor have an explicy id assigned
287                    return processor.hasCustomIdAssigned();
288                }
289            }
290    
291            // fallback to always register it
292            return true;
293        }
294    
295        public CamelNamingStrategy getNamingStrategy() {
296            return namingStrategy;
297        }
298    
299        public void setNamingStrategy(CamelNamingStrategy strategy) {
300            this.namingStrategy = strategy;
301        }
302    
303        public void setAgent(InstrumentationAgent agent) {
304            this.agent = agent;
305        }
306    
307    }