001 /** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one or more 004 * contributor license agreements. See the NOTICE file distributed with 005 * this work for additional information regarding copyright ownership. 006 * The ASF licenses this file to You under the Apache License, Version 2.0 007 * (the "License"); you may not use this file except in compliance with 008 * the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.camel.component.file; 019 020 import org.apache.camel.Component; 021 import org.apache.camel.Consumer; 022 import org.apache.camel.Processor; 023 import org.apache.camel.Producer; 024 import org.apache.camel.impl.PollingEndpoint; 025 import org.apache.camel.util.IntrospectionSupport; 026 027 import java.io.File; 028 import java.util.Map; 029 import java.util.concurrent.ScheduledExecutorService; 030 import java.util.concurrent.ScheduledThreadPoolExecutor; 031 032 /** 033 * @version $Revision: 523016 $ 034 */ 035 public class FileEndpoint extends PollingEndpoint<FileExchange> { 036 private File file; 037 private ScheduledExecutorService executor; 038 039 protected FileEndpoint(File file, String endpointUri, FileComponent component) { 040 super(endpointUri, component); 041 this.file = file; 042 this.executor = component.getExecutorService(); 043 } 044 045 046 /** 047 * @return a Producer 048 * @throws Exception 049 * @see org.apache.camel.Endpoint#createProducer() 050 */ 051 public Producer<FileExchange> createProducer() throws Exception { 052 Producer<FileExchange> result = new FileProducer(this); 053 return startService(result); 054 } 055 056 /** 057 * @param file 058 * @return a Consumer 059 * @throws Exception 060 * @see org.apache.camel.Endpoint#createConsumer(org.apache.camel.Processor) 061 */ 062 public Consumer<FileExchange> createConsumer(Processor<FileExchange> file) throws Exception { 063 Consumer<FileExchange> result = new FileConsumer(this, file, getExecutor()); 064 configureConsumer(result); 065 return startService(result); 066 } 067 068 /** 069 * @param file 070 * @return a FileExchange 071 * @see org.apache.camel.Endpoint#createExchange() 072 */ 073 public FileExchange createExchange(File file) { 074 return new FileExchange(getContext(), file); 075 } 076 077 /** 078 * @return an Exchange 079 * @see org.apache.camel.Endpoint#createExchange() 080 */ 081 public FileExchange createExchange() { 082 return createExchange(this.file); 083 } 084 085 /** 086 * @return the executor 087 */ 088 public synchronized ScheduledExecutorService getExecutor() { 089 if (this.executor == null) { 090 this.executor = new ScheduledThreadPoolExecutor(10); 091 } 092 return executor; 093 } 094 095 /** 096 * @param executor the executor to set 097 */ 098 public synchronized void setExecutor(ScheduledExecutorService executor) { 099 this.executor = executor; 100 } 101 102 public File getFile() { 103 return file; 104 } 105 106 public boolean isSingleton() { 107 return true; 108 } 109 }