001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.seda;
018    
019    import java.util.concurrent.BlockingQueue;
020    import java.util.concurrent.CountDownLatch;
021    
022    import org.apache.camel.Exchange;
023    import org.apache.camel.WaitForTaskToComplete;
024    import org.apache.camel.impl.SynchronizationAdapter;
025    import org.apache.camel.util.ExchangeHelper;
026    
027    /**
028     * @version $Revision: 780267 $
029     */
030    public class SedaProducer extends CollectionProducer {
031        private final SedaEndpoint endpoint;
032        private final WaitForTaskToComplete waitForTaskToComplete;
033    
034        public SedaProducer(SedaEndpoint endpoint, BlockingQueue<Exchange> queue, WaitForTaskToComplete waitForTaskToComplete) {
035            super(endpoint, queue);
036            this.endpoint = endpoint;
037            this.waitForTaskToComplete = waitForTaskToComplete;
038        }
039    
040        @Override
041        public void process(final Exchange exchange) throws Exception {
042            // use a new copy of the exchange to route async and handover the on completion to the new copy
043            // so its the new copy that performs the on completion callback when its done
044            Exchange copy = exchange.newCopy(true);
045            // set a new from endpoint to be the seda queue
046            copy.setFromEndpoint(endpoint);
047    
048            WaitForTaskToComplete wait = waitForTaskToComplete;
049            if (exchange.getIn().getHeader(Exchange.ASYNC_WAIT) != null) {
050                wait = exchange.getIn().getHeader(Exchange.ASYNC_WAIT, WaitForTaskToComplete.class);
051            }
052    
053            if (wait == WaitForTaskToComplete.Always
054                || (wait == WaitForTaskToComplete.IfReplyExpected && ExchangeHelper.isOutCapable(exchange))) {
055    
056                // only check for if there is a consumer if its the seda endpoint where we exepect a consumer in the same
057                // camel context. If you use the vm component the consumer could be in another camel context.
058                // for seda we want to check that a consumer exists otherwise we end up waiting forever for the response.
059                if (endpoint.getEndpointUri().startsWith("seda") && endpoint.getConsumers().isEmpty()) {
060                    throw new IllegalStateException("Cannot send to endpoint: " + endpoint.getEndpointUri() + " as no consumers is registered."
061                        + " With no consumers we end up waiting forever for the reply, as there are no consumers to process our exchange: " + exchange);
062                }
063    
064                // latch that waits until we are complete
065                final CountDownLatch latch = new CountDownLatch(1);
066    
067                // we should wait for the reply so install a on completion so we know when its complete
068                copy.addOnCompletion(new SynchronizationAdapter() {
069                    @Override
070                    public void onDone(Exchange response) {
071                        try {
072                            ExchangeHelper.copyResults(exchange, response);
073                        } finally {
074                            // always ensure latch is triggered
075                            latch.countDown();
076                        }
077                    }
078                });
079    
080                queue.add(copy);
081                latch.await();
082            } else {
083                // no wait, eg its a InOnly then just add to queue and return
084                queue.add(copy);
085            }
086        }
087    
088        @Override
089        protected void doStart() throws Exception {
090            super.doStart();
091            endpoint.onStarted(this);
092        }
093    
094        @Override
095        protected void doStop() throws Exception {
096            endpoint.onStopped(this);
097            super.doStop();
098        }
099    }