001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import org.apache.camel.CamelContext; 020 import org.apache.camel.Component; 021 import org.apache.camel.Endpoint; 022 import org.apache.camel.Exchange; 023 import org.apache.camel.PollingConsumer; 024 import org.apache.camel.util.ObjectHelper; 025 026 import java.lang.reflect.ParameterizedType; 027 import java.lang.reflect.Type; 028 import java.util.concurrent.ScheduledExecutorService; 029 import java.util.concurrent.ScheduledThreadPoolExecutor; 030 031 /** 032 * A default endpoint useful for implementation inheritance 033 * 034 * @version $Revision: 541335 $ 035 */ 036 public abstract class DefaultEndpoint<E extends Exchange> implements Endpoint<E> { 037 private String endpointUri; 038 private final Component component; 039 private CamelContext context; 040 private ScheduledExecutorService executorService; 041 042 protected DefaultEndpoint(String endpointUri, Component component) { 043 this.endpointUri = endpointUri; 044 this.component = component; 045 this.context = component.getCamelContext(); 046 } 047 048 public int hashCode() { 049 return endpointUri.hashCode() * 37 + 1; 050 } 051 052 @Override 053 public boolean equals(Object object) { 054 if (object instanceof DefaultEndpoint) { 055 DefaultEndpoint that = (DefaultEndpoint) object; 056 return ObjectHelper.equals(this.endpointUri, that.endpointUri); 057 } 058 return false; 059 } 060 061 @Override 062 public String toString() { 063 return "Endpoint[" + endpointUri + "]"; 064 } 065 066 public String getEndpointUri() { 067 return endpointUri; 068 } 069 070 public CamelContext getContext() { 071 return context; 072 } 073 074 public Component getComponent() { 075 return component; 076 } 077 078 /** 079 * @return the executor 080 */ 081 public synchronized ScheduledExecutorService getExecutorService() { 082 if (executorService == null) { 083 Component c = getComponent(); 084 if (c != null && c instanceof DefaultComponent) { 085 DefaultComponent dc = (DefaultComponent) c; 086 executorService = dc.getExecutorService(); 087 } 088 if (executorService == null) { 089 executorService = createExecutorService(); 090 } 091 } 092 return executorService; 093 } 094 095 /** 096 * @param executorService the executor to set 097 */ 098 public synchronized void setExecutorService(ScheduledExecutorService executorService) { 099 this.executorService = executorService; 100 } 101 102 public PollingConsumer<E> createPollingConsumer() throws Exception { 103 return new DefaultPollingConsumer<E>(this); 104 } 105 106 /** 107 * Converts the given exchange to the specified exchange type 108 */ 109 public E convertTo(Class<E> type, Exchange exchange) { 110 // TODO we could infer type parameter 111 if (type.isInstance(exchange)) { 112 return type.cast(exchange); 113 } 114 return getContext().getExchangeConverter().convertTo(type, exchange); 115 } 116 117 public E createExchange(Exchange exchange) { 118 Class<E> exchangeType = getExchangeType(); 119 if (exchangeType != null) { 120 if (exchangeType.isInstance(exchange)) { 121 return exchangeType.cast(exchange); 122 } 123 } 124 E answer = createExchange(); 125 answer.copyFrom(exchange); 126 return answer; 127 } 128 129 public E toExchangeType(Exchange exchange) { 130 // TODO avoid cloning exchanges if E == Exchange! 131 return createExchange(exchange); 132 } 133 134 /** 135 * Returns the type of the exchange which is generated by this component 136 */ 137 public Class<E> getExchangeType() { 138 Type type = getClass().getGenericSuperclass(); 139 if (type instanceof ParameterizedType) { 140 ParameterizedType parameterizedType = (ParameterizedType) type; 141 Type[] arguments = parameterizedType.getActualTypeArguments(); 142 if (arguments.length > 0) { 143 Type argumentType = arguments[0]; 144 if (argumentType instanceof Class) { 145 return (Class<E>) argumentType; 146 } 147 } 148 } 149 return null; 150 } 151 152 protected ScheduledThreadPoolExecutor createExecutorService() { 153 return new ScheduledThreadPoolExecutor(10); 154 } 155 }