001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.component.queue; 019 020 import org.apache.camel.AlreadyStoppedException; 021 import org.apache.camel.Consumer; 022 import org.apache.camel.Exchange; 023 import org.apache.camel.Processor; 024 import org.apache.camel.impl.ServiceSupport; 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 028 import java.util.concurrent.TimeUnit; 029 030 /** 031 * @version $Revision: 553312 $ 032 */ 033 public class QueueEndpointConsumer<E extends Exchange> extends ServiceSupport implements Consumer<E>, Runnable { 034 private static final Log log = LogFactory.getLog(QueueEndpointConsumer.class); 035 private static int counter; 036 037 private QueueEndpoint<E> endpoint; 038 private Processor processor; 039 private Thread thread; 040 041 public QueueEndpointConsumer(QueueEndpoint<E> endpoint, Processor processor) { 042 this.endpoint = endpoint; 043 this.processor = processor; 044 } 045 046 @Override 047 public String toString() { 048 return "QueueEndpointConsumer: " + endpoint.getEndpointUri(); 049 } 050 051 public void run() { 052 while (!isStopping()) { 053 E exchange; 054 try { 055 exchange = endpoint.getQueue().poll(1000, TimeUnit.MILLISECONDS); 056 } 057 catch (InterruptedException e) { 058 break; 059 } 060 if (exchange != null && !isStopping()) { 061 try { 062 processor.process(exchange); 063 } 064 catch (AlreadyStoppedException e) { 065 log.debug("Ignoring failed message due to shutdown: " + e, e); 066 break; 067 } 068 catch (Throwable e) { 069 log.error(e); 070 } 071 } 072 } 073 } 074 075 protected void doStart() throws Exception { 076 thread = new Thread(this, endpoint.getEndpointUri() + " thread:" + nextCounter()); 077 thread.setDaemon(true); 078 thread.start(); 079 } 080 081 protected void doStop() throws Exception { 082 thread.join(); 083 } 084 085 protected static synchronized int nextCounter() { 086 return ++counter; 087 } 088 }