001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.management;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.HashMap;
022    import java.util.List;
023    import java.util.Map;
024    
025    import javax.management.JMException;
026    import javax.management.MalformedObjectNameException;
027    import javax.management.ObjectName;
028    
029    import org.apache.camel.CamelContext;
030    import org.apache.camel.Consumer;
031    import org.apache.camel.Endpoint;
032    import org.apache.camel.Route;
033    import org.apache.camel.Service;
034    import org.apache.camel.impl.DefaultCamelContext;
035    import org.apache.camel.impl.ServiceSupport;
036    import org.apache.camel.model.OnExceptionDefinition;
037    import org.apache.camel.model.ProcessorDefinition;
038    import org.apache.camel.model.RouteDefinition;
039    import org.apache.camel.spi.InstrumentationAgent;
040    import org.apache.camel.spi.LifecycleStrategy;
041    import org.apache.camel.spi.RouteContext;
042    import org.apache.commons.logging.Log;
043    import org.apache.commons.logging.LogFactory;
044    
045    /**
046     * JMX agent that registeres Camel lifecycle events in JMX.
047     *
048     * @version $Revision: 751357 $
049     */
050    public class InstrumentationLifecycleStrategy implements LifecycleStrategy {
051        private static final transient Log LOG = LogFactory.getLog(InstrumentationProcessor.class);
052    
053        private InstrumentationAgent agent;
054        private CamelNamingStrategy namingStrategy;
055        private boolean initialized;
056    
057        // A map (Endpoint -> InstrumentationProcessor) to facilitate
058        // adding per-route interceptor and registering ManagedRoute MBean
059        private final Map<Endpoint, InstrumentationProcessor> interceptorMap =
060            new HashMap<Endpoint, InstrumentationProcessor>();
061    
062        public InstrumentationLifecycleStrategy() {
063            this(new DefaultInstrumentationAgent());
064        }
065    
066        public InstrumentationLifecycleStrategy(InstrumentationAgent agent) {
067            this.agent = agent;
068        }
069        /**
070         * Constructor for camel context that has been started.
071         *
072         * @param agent    the agent
073         * @param context  the camel context
074         */
075        public InstrumentationLifecycleStrategy(InstrumentationAgent agent, CamelContext context) {
076            this.agent = agent;
077            onContextStart(context);
078        }
079    
080        public void onContextStart(CamelContext context) {
081            if (context instanceof DefaultCamelContext) {
082                try {
083                    initialized = true;
084                    DefaultCamelContext dc = (DefaultCamelContext)context;
085                    // call addService so that context will start and stop the agent
086                    dc.addService(agent);
087                    namingStrategy = new CamelNamingStrategy(agent.getMBeanObjectDomainName());
088                    ManagedService ms = new ManagedService(dc);
089                    agent.register(ms, getNamingStrategy().getObjectName(dc));
090                } catch (Exception e) {
091                    LOG.warn("Could not register CamelContext MBean", e);
092                }
093            }
094        }
095    
096        public void onEndpointAdd(Endpoint endpoint) {
097            // the agent hasn't been started
098            if (!initialized) {
099                return;
100            }
101    
102            try {
103                ManagedEndpoint me = new ManagedEndpoint(endpoint);
104                agent.register(me, getNamingStrategy().getObjectName(me));
105            } catch (JMException e) {
106                LOG.warn("Could not register Endpoint MBean", e);
107            }
108        }
109    
110        @SuppressWarnings("unchecked")
111        public void onRoutesAdd(Collection<Route> routes) {
112            // the agent hasn't been started
113            if (!initialized) {
114                return;
115            }
116    
117            for (Route route : routes) {
118                try {
119                    ManagedRoute mr = new ManagedRoute(route);
120                    // retrieve the per-route intercept for this route
121                    InstrumentationProcessor interceptor = interceptorMap.get(route.getEndpoint());
122                    if (interceptor == null) {
123                        LOG.warn("Instrumentation processor not found for route endpoint: " + route.getEndpoint());
124                    } else {
125                        interceptor.setCounter(mr);
126                    }
127                    agent.register(mr, getNamingStrategy().getObjectName(mr));
128                } catch (JMException e) {
129                    LOG.warn("Could not register Route MBean", e);
130                }
131            }
132        }
133    
134        public void onServiceAdd(CamelContext context, Service service) {
135            // the agent hasn't been started
136            if (!initialized) {
137                return;
138            }
139            if (service instanceof ServiceSupport && service instanceof Consumer) {
140                // TODO: add support for non-consumer services?
141                try {
142                    ManagedService ms = new ManagedService((ServiceSupport)service);
143                    agent.register(ms, getNamingStrategy().getObjectName(context, ms));
144                } catch (JMException e) {
145                    LOG.warn("Could not register Service MBean", e);
146                }
147            }
148        }
149    
150        public void onRouteContextCreate(RouteContext routeContext) {
151            // the agent hasn't been started
152            if (!initialized) {
153                return;
154            }
155    
156            // Create a map (ProcessorType -> PerformanceCounter)
157            // to be passed to InstrumentationInterceptStrategy.
158            Map<ProcessorDefinition, PerformanceCounter> counterMap =
159                new HashMap<ProcessorDefinition, PerformanceCounter>();
160    
161            // Each processor in a route will have its own performance counter
162            // The performance counter are MBeans that we register with MBeanServer.
163            // These performance counter will be embedded
164            // to InstrumentationProcessor and wrap the appropriate processor
165            // by InstrumentationInterceptStrategy.
166            RouteDefinition route = routeContext.getRoute();
167            
168            for (ProcessorDefinition processor : route.getOutputs()) {
169                ObjectName name = null;
170                try {
171                    // get the mbean name
172                    name = getNamingStrategy().getObjectName(routeContext, processor);
173    
174                    // register mbean wrapped in the performance counter mbean
175                    PerformanceCounter pc = new PerformanceCounter();
176                    agent.register(pc, name);
177    
178                    // add to map now that it has been registered
179                    counterMap.put(processor, pc);
180                } catch (MalformedObjectNameException e) {
181                    LOG.warn("Could not create MBean name: " + name, e);
182                } catch (JMException e) {
183                    LOG.warn("Could not register PerformanceCounter MBean: " + name, e);
184                }
185            }
186            
187            routeContext.addInterceptStrategy(new InstrumentationInterceptStrategy(counterMap));
188            routeContext.setErrorHandlerWrappingStrategy(new InstrumentationErrorHandlerWrappingStrategy(routeContext, counterMap));
189    
190            // Add an InstrumentationProcessor at the beginning of each route and
191            // set up the interceptorMap for onRoutesAdd() method to register the
192            // ManagedRoute MBeans.
193    
194            RouteDefinition routeType = routeContext.getRoute();
195            if (routeType.getInputs() != null && !routeType.getInputs().isEmpty()) {
196                if (routeType.getInputs().size() > 1) {
197                    LOG.warn("Add InstrumentationProcessor to first input only.");
198                }
199    
200                Endpoint endpoint  = routeType.getInputs().get(0).getEndpoint();
201    
202                List<ProcessorDefinition> exceptionHandlers = new ArrayList<ProcessorDefinition>();
203                List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
204    
205                // separate out the exception handers in the outputs
206                for (ProcessorDefinition output : routeType.getOutputs()) {
207                    if (output instanceof OnExceptionDefinition) {
208                        exceptionHandlers.add(output);
209                    } else {
210                        outputs.add(output);
211                    }
212                }
213    
214                // clearing the outputs
215                routeType.clearOutput();
216    
217                // add exception handlers as top children
218                routeType.getOutputs().addAll(exceptionHandlers);
219    
220                // add an interceptor
221                InstrumentationProcessor processor = new InstrumentationProcessor();
222                routeType.intercept(processor);
223    
224                // add the output
225                for (ProcessorDefinition processorType : outputs) {
226                    routeType.addOutput(processorType);
227                }
228    
229                interceptorMap.put(endpoint, processor);
230            }
231    
232        }
233    
234        public CamelNamingStrategy getNamingStrategy() {
235            return namingStrategy;
236        }
237    
238        public void setNamingStrategy(CamelNamingStrategy strategy) {
239            this.namingStrategy = strategy;
240        }
241    
242        public void setAgent(InstrumentationAgent agent) {
243            this.agent = agent;
244        }
245    
246    }