001 /* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.queue; 018 019 import org.apache.camel.CamelContext; 020 import org.apache.camel.Component; 021 import org.apache.camel.Endpoint; 022 import org.apache.camel.EndpointResolver; 023 import org.apache.camel.Exchange; 024 import org.apache.camel.util.ObjectHelper; 025 026 import java.util.concurrent.BlockingQueue; 027 import java.util.concurrent.Callable; 028 029 /** 030 * An implementation of {@link EndpointResolver} that creates 031 * {@link QueueEndpoint} objects. 032 * 033 * The syntax for a Queue URI looks like: 034 * 035 * <pre><code>queue:[component:]queuename</code></pre> 036 * the component is optional, and if it is not specified, the default component name 037 * is assumed. 038 * 039 * @version $Revision: 519901 $ 040 */ 041 public class QueueEndpointResolver<E extends Exchange> implements EndpointResolver<E> { 042 043 public static final String DEFAULT_COMPONENT_NAME = QueueComponent.class.getName(); 044 045 /** 046 * Finds the {@see QueueComponent} specified by the uri. If the {@see QueueComponent} 047 * object do not exist, it will be created. 048 * 049 * @see org.apache.camel.EndpointResolver#resolveComponent(org.apache.camel.CamelContext, java.lang.String) 050 */ 051 public Component resolveComponent(CamelContext container, String uri) { 052 String id[] = getEndpointId(uri); 053 return resolveQueueComponent(container, id[0]); 054 } 055 056 /** 057 * Finds the {@see QueueEndpoint} specified by the uri. If the {@see QueueEndpoint} or it's associated 058 * {@see QueueComponent} object do not exist, they will be created. 059 * 060 * @see org.apache.camel.EndpointResolver#resolveEndpoint(org.apache.camel.CamelContext, java.lang.String) 061 */ 062 public Endpoint<E> resolveEndpoint(CamelContext container, String uri) { 063 String id[] = getEndpointId(uri); 064 QueueComponent<E> component = resolveQueueComponent(container, id[0]); 065 BlockingQueue<E> queue = component.getOrCreateQueue(id[1]); 066 return new QueueEndpoint<E>(uri, container, queue); 067 } 068 069 /** 070 * @return an array that looks like: [componentName,endpointName] 071 */ 072 private String[] getEndpointId(String uri) { 073 String rc [] = {DEFAULT_COMPONENT_NAME, null}; 074 String splitURI[] = ObjectHelper.splitOnCharacter(uri, ":", 3); 075 if( splitURI[2] != null ) { 076 rc[0] = splitURI[1]; 077 rc[1] = splitURI[2]; 078 } else { 079 rc[0] = splitURI[1]; 080 } 081 return rc; 082 } 083 084 @SuppressWarnings("unchecked") 085 private QueueComponent<E> resolveQueueComponent(CamelContext container, String componentName) { 086 Component rc = container.getOrCreateComponent(componentName, new Callable<Component>(){ 087 public Component<E> call() throws Exception { 088 return new QueueComponent<E>(); 089 }}); 090 return (QueueComponent<E>) rc; 091 } 092 093 }