001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.lang.reflect.ParameterizedType;
020    import java.lang.reflect.Type;
021    import java.util.concurrent.ScheduledExecutorService;
022    import java.util.concurrent.ScheduledThreadPoolExecutor;
023    
024    import org.apache.camel.CamelContext;
025    import org.apache.camel.Component;
026    import org.apache.camel.Endpoint;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.PollingConsumer;
029    import org.apache.camel.util.ObjectHelper;
030    
031    /**
032     * A default endpoint useful for implementation inheritance
033     *
034     * @version $Revision: 563607 $
035     */
036    public abstract class DefaultEndpoint<E extends Exchange> implements Endpoint<E> {
037        private String endpointUri;
038        private CamelContext context;
039        private Component component;
040        private ScheduledExecutorService executorService;
041    
042        protected DefaultEndpoint(String endpointUri, Component component) {
043            this(endpointUri, component.getCamelContext());
044            this.component = component;
045        }
046    
047        protected DefaultEndpoint(String endpointUri, CamelContext context) {
048            this.endpointUri = endpointUri;
049            this.context = context;
050        }
051    
052        public int hashCode() {
053            return endpointUri.hashCode() * 37 + 1;
054        }
055    
056        @Override
057        public boolean equals(Object object) {
058            if (object instanceof DefaultEndpoint) {
059                DefaultEndpoint that = (DefaultEndpoint) object;
060                return ObjectHelper.equals(this.endpointUri, that.endpointUri);
061            }
062            return false;
063        }
064    
065        @Override
066        public String toString() {
067            return "Endpoint[" + endpointUri + "]";
068        }
069    
070        public String getEndpointUri() {
071            return endpointUri;
072        }
073    
074        public CamelContext getContext() {
075            return context;
076        }
077    
078        public Component getComponent() {
079            return component;
080        }
081    
082        /**
083         * @return the executor
084         */
085        public synchronized ScheduledExecutorService getExecutorService() {
086            if (executorService == null) {
087                Component c = getComponent();
088                if (c != null && c instanceof DefaultComponent) {
089                    DefaultComponent dc = (DefaultComponent) c;
090                    executorService = dc.getExecutorService();
091                }
092                if (executorService == null) {
093                    executorService = createExecutorService();
094                }
095            }
096            return executorService;
097        }
098    
099        /**
100         * @param executorService the executor to set
101         */
102        public synchronized void setExecutorService(ScheduledExecutorService executorService) {
103            this.executorService = executorService;
104        }
105    
106        public PollingConsumer<E> createPollingConsumer() throws Exception {
107            return new DefaultPollingConsumer<E>(this);
108        }
109    
110        /**
111         * Converts the given exchange to the specified exchange type
112         */
113        public E convertTo(Class<E> type, Exchange exchange) {
114            // TODO we could infer type parameter
115            if (type.isInstance(exchange)) {
116                return type.cast(exchange);
117            }
118            return getContext().getExchangeConverter().convertTo(type, exchange);
119        }
120    
121        public E createExchange(Exchange exchange) {
122            Class<E> exchangeType = getExchangeType();
123            if (exchangeType != null) {
124                if (exchangeType.isInstance(exchange)) {
125                    return exchangeType.cast(exchange);
126                }
127            }
128            E answer = createExchange();
129            answer.copyFrom(exchange);
130            return answer;
131        }
132    
133        public E toExchangeType(Exchange exchange) {
134            // TODO avoid cloning exchanges if E == Exchange!
135            return createExchange(exchange);
136        }
137    
138        /**
139         * Returns the type of the exchange which is generated by this component
140         */
141        public Class<E> getExchangeType() {
142            Type type = getClass().getGenericSuperclass();
143            if (type instanceof ParameterizedType) {
144                ParameterizedType parameterizedType = (ParameterizedType) type;
145                Type[] arguments = parameterizedType.getActualTypeArguments();
146                if (arguments.length > 0) {
147                    Type argumentType = arguments[0];
148                    if (argumentType instanceof Class) {
149                        return (Class<E>) argumentType;
150                    }
151                }
152            }
153            return null;
154        }
155    
156        protected ScheduledThreadPoolExecutor createExecutorService() {
157            return new ScheduledThreadPoolExecutor(10);
158        }
159    }