001    /*
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.queue;
018    
019    import java.util.Queue;
020    import java.util.concurrent.BlockingQueue;
021    import java.util.concurrent.TimeUnit;
022    import java.util.concurrent.atomic.AtomicBoolean;
023    
024    import org.apache.camel.CamelContext;
025    import org.apache.camel.Processor;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.impl.DefaultEndpoint;
028    import org.apache.camel.impl.DefaultExchange;
029    
030    /**
031     * Represents a queue endpoint that uses a {@link BlockingQueue}
032     * object to process inbound exchanges.
033     *
034     * @org.apache.xbean.XBean
035     * @version $Revision: 519973 $
036     */
037    public class QueueEndpoint<E extends Exchange> extends DefaultEndpoint<E> {
038        private BlockingQueue<E> queue;
039            private org.apache.camel.queue.QueueEndpoint.Activation activation;
040    
041        public QueueEndpoint(String uri, CamelContext container, BlockingQueue<E> queue) {
042            super(uri, container);
043            this.queue = queue;
044        }
045    
046        public void onExchange(E exchange) {
047            queue.add(exchange);
048        }
049    
050        public void setInboundProcessor(Processor<E> processor) {
051            // TODO lets start a thread to process inbound requests
052            // if we don't already have one
053        }
054    
055        public E createExchange() {
056            // How can we create a specific Exchange if we are generic??
057            // perhaps it would be better if we did not implement this. 
058            return (E) new DefaultExchange(getContext());
059        }
060    
061        public Queue<E> getQueue() {
062            return queue;
063        }
064        
065        class Activation implements Runnable {
066                    AtomicBoolean stop = new AtomicBoolean();
067                    private Thread thread;
068                    
069                    public void run() {
070                            while(!stop.get()) {
071                                    E exchange=null;
072                                    try {
073                                            exchange = queue.poll(100, TimeUnit.MILLISECONDS);
074                                    } catch (InterruptedException e) {
075                                            break;
076                                    }
077                                    if( exchange !=null ) {
078                                            try {
079                                                    getInboundProcessor().onExchange(exchange);
080                                            } catch (Throwable e) {
081                                                    e.printStackTrace();
082                                            }
083                                    }
084                            }
085                    }
086    
087                    public void start() {
088                            thread = new Thread(this, getEndpointUri());
089                            thread.setDaemon(true);
090                            thread.start();
091                    }
092    
093                    public void stop() throws InterruptedException {
094                            stop.set(true);
095                            thread.join();
096                    }
097                    
098                    @Override
099                    public String toString() {
100                            return "Activation: "+getEndpointUri();
101                    }
102        }
103    
104        @Override
105        protected void doActivate() {
106                    activation = new Activation();
107                    activation.start();
108        }
109        
110        @Override
111        protected void doDeactivate() {
112                    try {
113                            activation.stop();
114                            activation=null;
115                    } catch (InterruptedException e) {
116                            throw new RuntimeException(e);
117                    }
118        }
119    }