001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.Callable; 020 import java.util.concurrent.ExecutorService; 021 import java.util.concurrent.ScheduledThreadPoolExecutor; 022 import java.util.concurrent.ThreadFactory; 023 024 import org.apache.camel.AsyncCallback; 025 import org.apache.camel.Endpoint; 026 import org.apache.camel.Exchange; 027 import org.apache.camel.ExchangePattern; 028 029 /** 030 * Processor for wire tapping exchanges to an endpoint destination. 031 * 032 * @version $Revision: 751648 $ 033 */ 034 public class WireTapProcessor extends SendProcessor { 035 036 private int defaultThreadPoolSize = 5; 037 private ExecutorService executorService; 038 039 public WireTapProcessor(Endpoint destination) { 040 super(destination); 041 } 042 043 public WireTapProcessor(Endpoint destination, ExchangePattern pattern) { 044 super(destination, pattern); 045 } 046 047 @Override 048 protected void doStart() throws Exception { 049 super.doStart(); 050 } 051 052 @Override 053 protected void doStop() throws Exception { 054 if (executorService != null) { 055 executorService.shutdown(); 056 } 057 super.doStop(); 058 } 059 060 @Override 061 public String toString() { 062 return "wireTap(" + destination.getEndpointUri() + ")"; 063 } 064 065 public void process(Exchange exchange) throws Exception { 066 if (producer == null) { 067 if (isStopped()) { 068 LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange); 069 } else { 070 throw new IllegalStateException("No producer, this processor has not been started!"); 071 } 072 } else { 073 final Exchange wireTapExchange = configureExchange(exchange); 074 075 // use submit instead of execute to force it to use a new thread, execute might 076 // decide to use current thread, so we must submit a new task 077 // as we dont care for the response we dont hold the future object and wait for the result 078 getExecutorService().submit(new Callable<Object>() { 079 public Object call() throws Exception { 080 if (LOG.isDebugEnabled()) { 081 LOG.debug("Processing wiretap: " + wireTapExchange); 082 } 083 producer.process(wireTapExchange); 084 return null; 085 } 086 }); 087 } 088 } 089 090 public boolean process(Exchange exchange, final AsyncCallback callback) { 091 if (producer == null) { 092 if (isStopped()) { 093 LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange); 094 } else { 095 exchange.setException(new IllegalStateException("No producer, this processor has not been started!")); 096 } 097 callback.done(true); 098 return true; 099 } else { 100 exchange = configureExchange(exchange); 101 102 final Exchange wireTapExchange = configureExchange(exchange); 103 104 // use submit instead of execute to force it to use a new thread, execute might 105 // decide to use current thread, so we must submit a new task 106 // as we dont care for the response we dont hold the future object and wait for the result 107 getExecutorService().submit(new Callable<Object>() { 108 public Object call() throws Exception { 109 if (LOG.isDebugEnabled()) { 110 LOG.debug("Processing wiretap: " + wireTapExchange); 111 } 112 return processor.process(wireTapExchange, callback); 113 } 114 }); 115 116 // return true to indicate caller its okay, and he should not wait as this wiretap 117 // is a fire and forget 118 return true; 119 } 120 } 121 122 123 @Override 124 protected Exchange configureExchange(Exchange exchange) { 125 // must use a copy as we dont want it to cause side effects of the original exchange 126 Exchange copy = exchange.copy(); 127 // set MEP to InOnly as this wire tap is a fire and forget 128 copy.setPattern(ExchangePattern.InOnly); 129 return copy; 130 } 131 132 public ExecutorService getExecutorService() { 133 if (executorService == null) { 134 executorService = createExecutorService(); 135 } 136 return executorService; 137 } 138 139 private ExecutorService createExecutorService() { 140 return new ScheduledThreadPoolExecutor(defaultThreadPoolSize, new ThreadFactory() { 141 int counter; 142 143 public synchronized Thread newThread(Runnable runnable) { 144 Thread thread = new Thread(runnable); 145 thread.setName("Thread: " + (++counter) + " " + WireTapProcessor.this.toString()); 146 return thread; 147 } 148 }); 149 } 150 151 public void setExecutorService(ExecutorService executorService) { 152 this.executorService = executorService; 153 } 154 155 }