001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.component.queue; 019 020 import org.apache.camel.AlreadyStoppedException; 021 import org.apache.camel.Consumer; 022 import org.apache.camel.Exchange; 023 import org.apache.camel.Processor; 024 import org.apache.camel.impl.ServiceSupport; 025 import org.apache.commons.logging.Log; 026 import org.apache.commons.logging.LogFactory; 027 028 import java.util.concurrent.TimeUnit; 029 030 /** 031 * @version $Revision: 553477 $ 032 */ 033 public class QueueConsumer<E extends Exchange> extends ServiceSupport implements Consumer<E>, Runnable { 034 private static final Log log = LogFactory.getLog(QueueConsumer.class); 035 036 private QueueEndpoint<E> endpoint; 037 private Processor processor; 038 private Thread thread; 039 040 public QueueConsumer(QueueEndpoint<E> endpoint, Processor processor) { 041 this.endpoint = endpoint; 042 this.processor = processor; 043 } 044 045 @Override 046 public String toString() { 047 return "QueueConsumer: " + endpoint.getEndpointUri(); 048 } 049 050 public void run() { 051 while (!isStopping()) { 052 E exchange; 053 try { 054 exchange = endpoint.getQueue().poll(1000, TimeUnit.MILLISECONDS); 055 } 056 catch (InterruptedException e) { 057 break; 058 } 059 if (exchange != null && !isStopping()) { 060 try { 061 processor.process(exchange); 062 } 063 catch (AlreadyStoppedException e) { 064 log.debug("Ignoring failed message due to shutdown: " + e, e); 065 break; 066 } 067 catch (Throwable e) { 068 log.error(e); 069 } 070 } 071 } 072 } 073 074 protected void doStart() throws Exception { 075 thread = new Thread(this, getThreadName(endpoint.getEndpointUri())); 076 thread.setDaemon(true); 077 thread.start(); 078 } 079 080 protected void doStop() throws Exception { 081 thread.join(); 082 } 083 084 }