001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.lang.reflect.Method;
022    import java.util.Comparator;
023    import java.util.HashMap;
024    import java.util.Map;
025    
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.Component;
028    import org.apache.camel.Exchange;
029    import org.apache.camel.Expression;
030    import org.apache.camel.Message;
031    import org.apache.camel.Processor;
032    import org.apache.camel.impl.ScheduledPollEndpoint;
033    import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
034    import org.apache.camel.spi.FactoryFinder;
035    import org.apache.camel.spi.IdempotentRepository;
036    import org.apache.camel.spi.Language;
037    import org.apache.camel.util.FileUtil;
038    import org.apache.camel.util.ObjectHelper;
039    import org.apache.camel.util.UuidGenerator;
040    import org.apache.commons.logging.Log;
041    import org.apache.commons.logging.LogFactory;
042    
043    /**
044     * Generic FileEndpoint
045     */
046    public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint {
047    
048        protected static final transient String DEFAULT_STRATEGYFACTORY_CLASS = "org.apache.camel.component.file.strategy.GenericFileProcessStrategyFactory";
049        protected static final transient int DEFAULT_IDEMPOTENT_CACHE_SIZE = 1000;
050    
051        protected final transient Log log = LogFactory.getLog(getClass());
052    
053        protected GenericFileProcessStrategy<T> processStrategy;
054        protected GenericFileConfiguration configuration;
055    
056        protected IdempotentRepository<String> inProgressRepository = new MemoryIdempotentRepository();
057        protected String localWorkDirectory;
058        protected boolean autoCreate = true;
059        protected int bufferSize = 128 * 1024;
060        protected GenericFileExist fileExist = GenericFileExist.Override;
061        protected boolean noop;
062        protected boolean recursive;
063        protected boolean delete;
064        protected boolean flatten;
065        protected int maxMessagesPerPoll;
066        protected String tempPrefix;
067        protected String include;
068        protected String exclude;
069        protected Expression fileName;
070        protected Expression move;
071        protected Expression preMove;
072        protected boolean idempotent;
073        protected IdempotentRepository idempotentRepository;
074        protected GenericFileFilter<T> filter;
075        protected Comparator<GenericFile<T>> sorter;
076        protected Comparator<GenericFileExchange> sortBy;
077        protected String readLock = "none";
078        protected long readLockTimeout;
079        protected GenericFileExclusiveReadLockStrategy exclusiveReadLockStrategy;
080    
081        public GenericFileEndpoint() {
082        }
083    
084        public GenericFileEndpoint(String endpointUri, Component component) {
085            super(endpointUri, component);
086        }
087    
088        public boolean isSingleton() {
089            return true;
090        }
091    
092        public abstract GenericFileConsumer<T> createConsumer(Processor processor) throws Exception;
093    
094        public abstract GenericFileProducer<T> createProducer() throws Exception;
095    
096        public abstract GenericFileExchange<T> createExchange(GenericFile<T> file);
097    
098        public abstract String getScheme();
099        
100        public abstract char getFileSeparator();
101        
102        public abstract boolean isAbsolute(String name);
103        
104        /**
105         * Return the file name that will be auto-generated for the given message if
106         * none is provided
107         */
108        public String getGeneratedFileName(Message message) {
109            return UuidGenerator.generateSanitizedId(message.getMessageId());
110        }
111    
112        public GenericFileProcessStrategy<T> getGenericFileProcessStrategy() {
113            if (processStrategy == null) {
114                processStrategy = createGenericFileStrategy();
115                if (log.isDebugEnabled()) {
116                    log.debug("Using Generic file process strategy: " + processStrategy);
117                }
118            }
119            return processStrategy;
120        }
121    
122        /**
123         * A strategy method to lazily create the file strategy
124         */
125        @SuppressWarnings("unchecked")
126        protected GenericFileProcessStrategy<T> createGenericFileStrategy() {
127            Class<?> factory = null;
128            try {
129                FactoryFinder finder = getCamelContext().getFactoryFinder("META-INF/services/org/apache/camel/component/");
130                factory = finder.findClass(getScheme(), "strategy.factory.");
131            } catch (ClassNotFoundException e) {
132                log.debug("'strategy.factory.class' not found", e);
133            } catch (IOException e) {
134                log.debug("No strategy factory defined in 'META-INF/services/org/apache/camel/component/'", e);
135            }
136    
137            if (factory == null) {
138                // use default
139                factory = this.getCamelContext().getClassResolver().resolveClass(DEFAULT_STRATEGYFACTORY_CLASS);
140                if (factory == null) {
141                    throw new TypeNotPresentException(DEFAULT_STRATEGYFACTORY_CLASS + " class not found", null);
142                }
143            }
144    
145            try {
146                Method factoryMethod = factory.getMethod("createGenericFileProcessStrategy", CamelContext.class, Map.class);
147                return (GenericFileProcessStrategy<T>) ObjectHelper.invokeMethod(factoryMethod, null, getCamelContext(), getParamsAsMap());
148            } catch (NoSuchMethodException e) {
149                throw new TypeNotPresentException(factory.getSimpleName() + ".createGenericFileProcessStrategy method not found", e);
150            }
151        }
152    
153        public void setGenericFileProcessStrategy(GenericFileProcessStrategy<T> genericFileProcessStrategy) {
154            this.processStrategy = genericFileProcessStrategy;
155        }
156    
157        public boolean isNoop() {
158            return noop;
159        }
160    
161        public void setNoop(boolean noop) {
162            this.noop = noop;
163        }
164    
165        public boolean isRecursive() {
166            return recursive;
167        }
168    
169        public void setRecursive(boolean recursive) {
170            this.recursive = recursive;
171        }
172    
173        public String getInclude() {
174            return include;
175        }
176    
177        public void setInclude(String include) {
178            this.include = include;
179        }
180    
181        public String getExclude() {
182            return exclude;
183        }
184    
185        public void setExclude(String exclude) {
186            this.exclude = exclude;
187        }
188    
189        public boolean isDelete() {
190            return delete;
191        }
192    
193        public void setDelete(boolean delete) {
194            this.delete = delete;
195        }
196    
197        public boolean isFlatten() {
198            return flatten;
199        }
200    
201        public void setFlatten(boolean flatten) {
202            this.flatten = flatten;
203        }
204    
205        public Expression getMove() {
206            return move;
207        }
208    
209        public void setMove(Expression move) {
210            this.move = move;
211        }
212    
213        /**
214         * Sets the move expression based on
215         * {@link org.apache.camel.language.simple.FileLanguage}
216         */
217        public void setMove(String fileLanguageExpression) {
218            String expression = configureMoveOrPreMoveExpression(fileLanguageExpression);
219            this.move = createFileLangugeExpression(expression);
220        }
221    
222        public Expression getPreMove() {
223            return preMove;
224        }
225    
226        public void setPreMove(Expression preMove) {
227            this.preMove = preMove;
228        }
229    
230        /**
231         * Sets the pre move expression based on
232         * {@link org.apache.camel.language.simple.FileLanguage}
233         */
234        public void setPreMove(String fileLanguageExpression) {
235            String expression = configureMoveOrPreMoveExpression(fileLanguageExpression);
236            this.preMove = createFileLangugeExpression(expression);
237        }
238    
239        public Expression getFileName() {
240            return fileName;
241        }
242    
243        public void setFileName(Expression fileName) {
244            this.fileName = fileName;
245        }
246    
247        /**
248         * Sets the file expression based on
249         * {@link org.apache.camel.language.simple.FileLanguage}
250         */
251        public void setFileName(String fileLanguageExpression) {
252            this.fileName = createFileLangugeExpression(fileLanguageExpression);
253        }
254    
255        public boolean isIdempotent() {
256            return idempotent;
257        }
258    
259        public void setIdempotent(boolean idempotent) {
260            this.idempotent = idempotent;
261        }
262    
263        public IdempotentRepository getIdempotentRepository() {
264            return idempotentRepository;
265        }
266    
267        public void setIdempotentRepository(IdempotentRepository idempotentRepository) {
268            this.idempotentRepository = idempotentRepository;
269        }
270    
271        public GenericFileFilter<T> getFilter() {
272            return filter;
273        }
274    
275        public void setFilter(GenericFileFilter<T> filter) {
276            this.filter = filter;
277        }
278    
279        public Comparator<GenericFile<T>> getSorter() {
280            return sorter;
281        }
282    
283        public void setSorter(Comparator<GenericFile<T>> sorter) {
284            this.sorter = sorter;
285        }
286    
287        public Comparator<GenericFileExchange> getSortBy() {
288            return sortBy;
289        }
290    
291        public void setSortBy(Comparator<GenericFileExchange> sortBy) {
292            this.sortBy = sortBy;
293        }
294    
295        public void setSortBy(String expression) {
296            setSortBy(expression, false);
297        }
298    
299        public void setSortBy(String expression, boolean reverse) {
300            setSortBy(GenericFileDefaultSorter.sortByFileLanguage(getCamelContext(), expression, reverse));
301        }
302    
303        public String getTempPrefix() {
304            return tempPrefix;
305        }
306    
307        /**
308         * Enables and uses temporary prefix when writing files, after write it will
309         * be renamed to the correct name.
310         */
311        public void setTempPrefix(String tempPrefix) {
312            this.tempPrefix = tempPrefix;
313        }
314    
315        public GenericFileConfiguration getConfiguration() {
316            if (configuration == null) {
317                configuration = new GenericFileConfiguration();
318            }
319            return configuration;
320        }
321    
322        public void setConfiguration(GenericFileConfiguration configuration) {
323            this.configuration = configuration;
324        }
325    
326        public GenericFileExclusiveReadLockStrategy getExclusiveReadLockStrategy() {
327            return exclusiveReadLockStrategy;
328        }
329    
330        public void setExclusiveReadLockStrategy(GenericFileExclusiveReadLockStrategy exclusiveReadLockStrategy) {
331            this.exclusiveReadLockStrategy = exclusiveReadLockStrategy;
332        }
333    
334        public String getReadLock() {
335            return readLock;
336        }
337    
338        public void setReadLock(String readLock) {
339            this.readLock = readLock;
340        }
341    
342        public long getReadLockTimeout() {
343            return readLockTimeout;
344        }
345    
346        public void setReadLockTimeout(long readLockTimeout) {
347            this.readLockTimeout = readLockTimeout;
348        }
349    
350        public int getBufferSize() {
351            return bufferSize;
352        }
353    
354        public void setBufferSize(int bufferSize) {
355            this.bufferSize = bufferSize;
356        }
357    
358        public GenericFileExist getFileExist() {
359            return fileExist;
360        }
361    
362        public void setFileExist(GenericFileExist fileExist) {
363            this.fileExist = fileExist;
364        }
365    
366        public boolean isAutoCreate() {
367            return autoCreate;
368        }
369    
370        public void setAutoCreate(boolean autoCreate) {
371            this.autoCreate = autoCreate;
372        }
373    
374        public GenericFileProcessStrategy<T> getProcessStrategy() {
375            return processStrategy;
376        }
377    
378        public void setProcessStrategy(GenericFileProcessStrategy<T> processStrategy) {
379            this.processStrategy = processStrategy;
380        }
381    
382        public String getLocalWorkDirectory() {
383            return localWorkDirectory;
384        }
385    
386        public void setLocalWorkDirectory(String localWorkDirectory) {
387            this.localWorkDirectory = localWorkDirectory;
388        }
389    
390        public int getMaxMessagesPerPoll() {
391            return maxMessagesPerPoll;
392        }
393    
394        public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
395            this.maxMessagesPerPoll = maxMessagesPerPoll;
396        }
397    
398        public IdempotentRepository<String> getInProgressRepository() {
399            return inProgressRepository;
400        }
401    
402        public void setInProgressRepository(IdempotentRepository<String> inProgressRepository) {
403            this.inProgressRepository = inProgressRepository;
404        }
405    
406        /**
407         * Configures the given message with the file which sets the body to the
408         * file object.
409         */
410        public void configureMessage(GenericFile<T> file, Message message) {
411            message.setBody(file);
412    
413            if (flatten) {
414                // when flatten the file name should not contain any paths
415                message.setHeader(Exchange.FILE_NAME, file.getFileNameOnly());
416            } else {
417                // compute name to set on header that should be relative to starting directory
418                String name = file.isAbsolute() ? file.getAbsoluteFilePath() : file.getRelativeFilePath();
419    
420                // skip leading endpoint configured directory
421                String endpointPath = getConfiguration().getDirectory();
422                if (ObjectHelper.isNotEmpty(endpointPath) && name.startsWith(endpointPath)) {
423                    name = ObjectHelper.after(name, getConfiguration().getDirectory() + File.separator);
424                }
425    
426                // adjust filename
427                message.setHeader(Exchange.FILE_NAME, name);
428            }
429        }
430    
431        /**
432         * Strategy to configure the move or premove option based on a String input.
433         * <p/>
434         * @param expression the original string input
435         * @return configured string or the original if no modifications is needed
436         */
437        protected String configureMoveOrPreMoveExpression(String expression) {
438            // if the expression already have ${ } placeholders then pass it unmodified
439            if (expression.indexOf("${") != -1) {
440                return expression;
441            }
442    
443            // remove trailing slash
444            expression = FileUtil.stripTrailingSeparator(expression);
445    
446            StringBuilder sb = new StringBuilder();
447    
448            // if relative then insert start with the parent folder
449            if (!isAbsolute(expression)) {
450                sb.append("${file:parent}");
451                sb.append(getFileSeparator());
452            }
453            // insert the directory the end user provided
454            sb.append(expression);
455            // append only the filename (file:name can contain a relative path, so we must use onlyname)
456            sb.append(getFileSeparator());
457            sb.append("${file:onlyname}");
458    
459            return sb.toString();
460        }
461    
462        protected Map<String, Object> getParamsAsMap() {
463            Map<String, Object> params = new HashMap<String, Object>();
464    
465            if (isNoop()) {
466                params.put("noop", Boolean.toString(true));
467            }
468            if (isDelete()) {
469                params.put("delete", Boolean.toString(true));
470            }
471            if (move != null) {
472                params.put("move", move);
473            }
474            if (preMove != null) {
475                params.put("preMove", preMove);
476            }
477            if (exclusiveReadLockStrategy != null) {
478                params.put("exclusiveReadLockStrategy", exclusiveReadLockStrategy);
479            }
480            if (readLock != null) {
481                params.put("readLock", readLock);
482            }
483            if (readLockTimeout > 0) {
484                params.put("readLockTimeout", readLockTimeout);
485            }
486    
487            return params;
488        }
489    
490        private Expression createFileLangugeExpression(String expression) {
491            Language language = getCamelContext().resolveLanguage("file");
492            return language.createExpression(expression);
493        }
494    
495    }