001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.lang.reflect.ParameterizedType; 020 import java.lang.reflect.Type; 021 import java.util.concurrent.ScheduledExecutorService; 022 import java.util.concurrent.ScheduledThreadPoolExecutor; 023 024 import org.apache.camel.CamelContext; 025 import org.apache.camel.Component; 026 import org.apache.camel.Endpoint; 027 import org.apache.camel.Exchange; 028 import org.apache.camel.PollingConsumer; 029 import org.apache.camel.util.ObjectHelper; 030 031 /** 032 * A default endpoint useful for implementation inheritance 033 * 034 * @version $Revision: 563607 $ 035 */ 036 public abstract class DefaultEndpoint<E extends Exchange> implements Endpoint<E> { 037 private String endpointUri; 038 private CamelContext context; 039 private Component component; 040 private ScheduledExecutorService executorService; 041 042 protected DefaultEndpoint(String endpointUri, Component component) { 043 this(endpointUri, component.getCamelContext()); 044 this.component = component; 045 } 046 047 protected DefaultEndpoint(String endpointUri, CamelContext context) { 048 this.endpointUri = endpointUri; 049 this.context = context; 050 } 051 052 public int hashCode() { 053 return endpointUri.hashCode() * 37 + 1; 054 } 055 056 @Override 057 public boolean equals(Object object) { 058 if (object instanceof DefaultEndpoint) { 059 DefaultEndpoint that = (DefaultEndpoint) object; 060 return ObjectHelper.equals(this.endpointUri, that.endpointUri); 061 } 062 return false; 063 } 064 065 @Override 066 public String toString() { 067 return "Endpoint[" + endpointUri + "]"; 068 } 069 070 public String getEndpointUri() { 071 return endpointUri; 072 } 073 074 public CamelContext getContext() { 075 return context; 076 } 077 078 public Component getComponent() { 079 return component; 080 } 081 082 /** 083 * @return the executor 084 */ 085 public synchronized ScheduledExecutorService getExecutorService() { 086 if (executorService == null) { 087 Component c = getComponent(); 088 if (c != null && c instanceof DefaultComponent) { 089 DefaultComponent dc = (DefaultComponent) c; 090 executorService = dc.getExecutorService(); 091 } 092 if (executorService == null) { 093 executorService = createExecutorService(); 094 } 095 } 096 return executorService; 097 } 098 099 /** 100 * @param executorService the executor to set 101 */ 102 public synchronized void setExecutorService(ScheduledExecutorService executorService) { 103 this.executorService = executorService; 104 } 105 106 public PollingConsumer<E> createPollingConsumer() throws Exception { 107 return new DefaultPollingConsumer<E>(this); 108 } 109 110 /** 111 * Converts the given exchange to the specified exchange type 112 */ 113 public E convertTo(Class<E> type, Exchange exchange) { 114 // TODO we could infer type parameter 115 if (type.isInstance(exchange)) { 116 return type.cast(exchange); 117 } 118 return getContext().getExchangeConverter().convertTo(type, exchange); 119 } 120 121 public E createExchange(Exchange exchange) { 122 Class<E> exchangeType = getExchangeType(); 123 if (exchangeType != null) { 124 if (exchangeType.isInstance(exchange)) { 125 return exchangeType.cast(exchange); 126 } 127 } 128 E answer = createExchange(); 129 answer.copyFrom(exchange); 130 return answer; 131 } 132 133 public E toExchangeType(Exchange exchange) { 134 // TODO avoid cloning exchanges if E == Exchange! 135 return createExchange(exchange); 136 } 137 138 /** 139 * Returns the type of the exchange which is generated by this component 140 */ 141 public Class<E> getExchangeType() { 142 Type type = getClass().getGenericSuperclass(); 143 if (type instanceof ParameterizedType) { 144 ParameterizedType parameterizedType = (ParameterizedType) type; 145 Type[] arguments = parameterizedType.getActualTypeArguments(); 146 if (arguments.length > 0) { 147 Type argumentType = arguments[0]; 148 if (argumentType instanceof Class) { 149 return (Class<E>) argumentType; 150 } 151 } 152 } 153 return null; 154 } 155 156 protected ScheduledThreadPoolExecutor createExecutorService() { 157 return new ScheduledThreadPoolExecutor(10); 158 } 159 }