001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.net.URI;
020    import java.util.Map;
021    import java.util.concurrent.ScheduledExecutorService;
022    import java.util.concurrent.ScheduledThreadPoolExecutor;
023    import java.util.concurrent.ThreadFactory;
024    
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.Component;
027    import org.apache.camel.Endpoint;
028    import org.apache.camel.Exchange;
029    import org.apache.camel.util.IntrospectionSupport;
030    import org.apache.camel.util.ObjectHelper;
031    import org.apache.camel.util.URISupport;
032    
033    /**
034     * @version $Revision: 563607 $
035     */
036    public abstract class DefaultComponent<E extends Exchange> extends ServiceSupport implements Component<E> {
037    
038        private int defaultThreadPoolSize = 5;
039        private CamelContext camelContext;
040        private ScheduledExecutorService executorService;
041    
042        public DefaultComponent() {
043        }
044    
045        public DefaultComponent(CamelContext context) {
046            this.camelContext = context;
047        }
048    
049        public Endpoint<E> createEndpoint(String uri) throws Exception {
050            ObjectHelper.notNull(getCamelContext(), "camelContext");
051            URI u = new URI(uri);
052            String path = u.getSchemeSpecificPart();
053    
054            // lets trim off any query arguments
055            if (path.startsWith("//")) {
056                path = path.substring(2);
057            }
058            int idx = path.indexOf('?');
059            if (idx > 0) {
060                path = path.substring(0, idx);
061            }
062            Map parameters = URISupport.parseParamters(u);
063    
064            Endpoint<E> endpoint = createEndpoint(uri, path, parameters);
065            if (endpoint == null) {
066                return null;
067            }
068            if (parameters != null) {
069                if (endpoint instanceof ScheduledPollEndpoint) {
070                    ScheduledPollEndpoint scheduledPollEndpoint = (ScheduledPollEndpoint)endpoint;
071                    scheduledPollEndpoint.configureProperties(parameters);
072                }
073                IntrospectionSupport.setProperties(endpoint, parameters);
074            }
075            return endpoint;
076        }
077    
078        public CamelContext getCamelContext() {
079            return camelContext;
080        }
081    
082        public void setCamelContext(CamelContext context) {
083            this.camelContext = context;
084        }
085    
086        public ScheduledExecutorService getExecutorService() {
087            if (executorService == null) {
088                executorService = createExecutorService();
089            }
090            return executorService;
091        }
092    
093        public void setExecutorService(ScheduledExecutorService executorService) {
094            this.executorService = executorService;
095        }
096    
097        /**
098         * A factory method to create a default thread pool and executor
099         */
100        protected ScheduledExecutorService createExecutorService() {
101            return new ScheduledThreadPoolExecutor(defaultThreadPoolSize, new ThreadFactory() {
102                int counter;
103    
104                public synchronized Thread newThread(Runnable runnable) {
105                    Thread thread = new Thread(runnable);
106                    thread.setName("Thread: " + (++counter) + " " + DefaultComponent.this.toString());
107                    return thread;
108                }
109            });
110        }
111    
112        protected void doStart() throws Exception {
113        }
114    
115        protected void doStop() throws Exception {
116            if (executorService != null) {
117                executorService.shutdown();
118            }
119        }
120    
121        /**
122         * A factory method allowing derived components to create a new endpoint
123         * from the given URI, remaining path and optional parameters
124         * 
125         * @param uri the full URI of the endpoint
126         * @param remaining the remaining part of the URI without the query
127         *                parameters or component prefix
128         * @param parameters the optional parameters passed in
129         * @return a newly created endpoint or null if the endpoint cannot be
130         *         created based on the inputs
131         */
132        protected abstract Endpoint<E> createEndpoint(String uri, String remaining, Map parameters)
133            throws Exception;
134    }