001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.lang.reflect.Method; 022 import java.util.Comparator; 023 import java.util.HashMap; 024 import java.util.Map; 025 026 import org.apache.camel.CamelContext; 027 import org.apache.camel.Component; 028 import org.apache.camel.Exchange; 029 import org.apache.camel.Expression; 030 import org.apache.camel.Message; 031 import org.apache.camel.Processor; 032 import org.apache.camel.impl.ScheduledPollEndpoint; 033 import org.apache.camel.processor.idempotent.MemoryIdempotentRepository; 034 import org.apache.camel.spi.FactoryFinder; 035 import org.apache.camel.spi.IdempotentRepository; 036 import org.apache.camel.spi.Language; 037 import org.apache.camel.util.FileUtil; 038 import org.apache.camel.util.ObjectHelper; 039 import org.apache.camel.util.UuidGenerator; 040 import org.apache.commons.logging.Log; 041 import org.apache.commons.logging.LogFactory; 042 043 /** 044 * Generic FileEndpoint 045 */ 046 public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint { 047 048 protected static final transient String DEFAULT_STRATEGYFACTORY_CLASS = "org.apache.camel.component.file.strategy.GenericFileProcessStrategyFactory"; 049 protected static final transient int DEFAULT_IDEMPOTENT_CACHE_SIZE = 1000; 050 051 protected final transient Log log = LogFactory.getLog(getClass()); 052 053 protected GenericFileProcessStrategy<T> processStrategy; 054 protected GenericFileConfiguration configuration; 055 056 protected IdempotentRepository<String> inProgressRepository = new MemoryIdempotentRepository(); 057 protected String localWorkDirectory; 058 protected boolean autoCreate = true; 059 protected int bufferSize = 128 * 1024; 060 protected GenericFileExist fileExist = GenericFileExist.Override; 061 protected boolean noop; 062 protected boolean recursive; 063 protected boolean delete; 064 protected boolean flatten; 065 protected int maxMessagesPerPoll; 066 protected String tempPrefix; 067 protected String include; 068 protected String exclude; 069 protected Expression fileName; 070 protected Expression move; 071 protected Expression preMove; 072 protected boolean idempotent; 073 protected IdempotentRepository idempotentRepository; 074 protected GenericFileFilter<T> filter; 075 protected Comparator<GenericFile<T>> sorter; 076 protected Comparator<Exchange> sortBy; 077 protected String readLock = "none"; 078 protected long readLockTimeout; 079 protected GenericFileExclusiveReadLockStrategy exclusiveReadLockStrategy; 080 081 public GenericFileEndpoint() { 082 } 083 084 public GenericFileEndpoint(String endpointUri, Component component) { 085 super(endpointUri, component); 086 } 087 088 public boolean isSingleton() { 089 return true; 090 } 091 092 public abstract GenericFileConsumer<T> createConsumer(Processor processor) throws Exception; 093 094 public abstract GenericFileProducer<T> createProducer() throws Exception; 095 096 public abstract Exchange createExchange(GenericFile<T> file); 097 098 public abstract String getScheme(); 099 100 public abstract char getFileSeparator(); 101 102 public abstract boolean isAbsolute(String name); 103 104 /** 105 * Return the file name that will be auto-generated for the given message if 106 * none is provided 107 */ 108 public String getGeneratedFileName(Message message) { 109 return UuidGenerator.generateSanitizedId(message.getMessageId()); 110 } 111 112 public GenericFileProcessStrategy<T> getGenericFileProcessStrategy() { 113 if (processStrategy == null) { 114 processStrategy = createGenericFileStrategy(); 115 if (log.isDebugEnabled()) { 116 log.debug("Using Generic file process strategy: " + processStrategy); 117 } 118 } 119 return processStrategy; 120 } 121 122 /** 123 * A strategy method to lazily create the file strategy 124 */ 125 @SuppressWarnings("unchecked") 126 protected GenericFileProcessStrategy<T> createGenericFileStrategy() { 127 Class<?> factory = null; 128 try { 129 FactoryFinder finder = getCamelContext().getFactoryFinder("META-INF/services/org/apache/camel/component/"); 130 factory = finder.findClass(getScheme(), "strategy.factory."); 131 } catch (ClassNotFoundException e) { 132 log.debug("'strategy.factory.class' not found", e); 133 } catch (IOException e) { 134 log.debug("No strategy factory defined in 'META-INF/services/org/apache/camel/component/'", e); 135 } 136 137 if (factory == null) { 138 // use default 139 factory = this.getCamelContext().getClassResolver().resolveClass(DEFAULT_STRATEGYFACTORY_CLASS); 140 if (factory == null) { 141 throw new TypeNotPresentException(DEFAULT_STRATEGYFACTORY_CLASS + " class not found", null); 142 } 143 } 144 145 try { 146 Method factoryMethod = factory.getMethod("createGenericFileProcessStrategy", CamelContext.class, Map.class); 147 return (GenericFileProcessStrategy<T>) ObjectHelper.invokeMethod(factoryMethod, null, getCamelContext(), getParamsAsMap()); 148 } catch (NoSuchMethodException e) { 149 throw new TypeNotPresentException(factory.getSimpleName() + ".createGenericFileProcessStrategy method not found", e); 150 } 151 } 152 153 public boolean isNoop() { 154 return noop; 155 } 156 157 public void setNoop(boolean noop) { 158 this.noop = noop; 159 } 160 161 public boolean isRecursive() { 162 return recursive; 163 } 164 165 public void setRecursive(boolean recursive) { 166 this.recursive = recursive; 167 } 168 169 public String getInclude() { 170 return include; 171 } 172 173 public void setInclude(String include) { 174 this.include = include; 175 } 176 177 public String getExclude() { 178 return exclude; 179 } 180 181 public void setExclude(String exclude) { 182 this.exclude = exclude; 183 } 184 185 public boolean isDelete() { 186 return delete; 187 } 188 189 public void setDelete(boolean delete) { 190 this.delete = delete; 191 } 192 193 public boolean isFlatten() { 194 return flatten; 195 } 196 197 public void setFlatten(boolean flatten) { 198 this.flatten = flatten; 199 } 200 201 public Expression getMove() { 202 return move; 203 } 204 205 public void setMove(Expression move) { 206 this.move = move; 207 } 208 209 /** 210 * Sets the move expression based on 211 * {@link org.apache.camel.language.simple.FileLanguage} 212 */ 213 public void setMove(String fileLanguageExpression) { 214 String expression = configureMoveOrPreMoveExpression(fileLanguageExpression); 215 this.move = createFileLangugeExpression(expression); 216 } 217 218 public Expression getPreMove() { 219 return preMove; 220 } 221 222 public void setPreMove(Expression preMove) { 223 this.preMove = preMove; 224 } 225 226 /** 227 * Sets the pre move expression based on 228 * {@link org.apache.camel.language.simple.FileLanguage} 229 */ 230 public void setPreMove(String fileLanguageExpression) { 231 String expression = configureMoveOrPreMoveExpression(fileLanguageExpression); 232 this.preMove = createFileLangugeExpression(expression); 233 } 234 235 public Expression getFileName() { 236 return fileName; 237 } 238 239 public void setFileName(Expression fileName) { 240 this.fileName = fileName; 241 } 242 243 /** 244 * Sets the file expression based on 245 * {@link org.apache.camel.language.simple.FileLanguage} 246 */ 247 public void setFileName(String fileLanguageExpression) { 248 this.fileName = createFileLangugeExpression(fileLanguageExpression); 249 } 250 251 public boolean isIdempotent() { 252 return idempotent; 253 } 254 255 public void setIdempotent(boolean idempotent) { 256 this.idempotent = idempotent; 257 } 258 259 public IdempotentRepository getIdempotentRepository() { 260 return idempotentRepository; 261 } 262 263 public void setIdempotentRepository(IdempotentRepository idempotentRepository) { 264 this.idempotentRepository = idempotentRepository; 265 } 266 267 public GenericFileFilter<T> getFilter() { 268 return filter; 269 } 270 271 public void setFilter(GenericFileFilter<T> filter) { 272 this.filter = filter; 273 } 274 275 public Comparator<GenericFile<T>> getSorter() { 276 return sorter; 277 } 278 279 public void setSorter(Comparator<GenericFile<T>> sorter) { 280 this.sorter = sorter; 281 } 282 283 public Comparator<Exchange> getSortBy() { 284 return sortBy; 285 } 286 287 public void setSortBy(Comparator<Exchange> sortBy) { 288 this.sortBy = sortBy; 289 } 290 291 public void setSortBy(String expression) { 292 setSortBy(expression, false); 293 } 294 295 public void setSortBy(String expression, boolean reverse) { 296 setSortBy(GenericFileDefaultSorter.sortByFileLanguage(getCamelContext(), expression, reverse)); 297 } 298 299 public String getTempPrefix() { 300 return tempPrefix; 301 } 302 303 /** 304 * Enables and uses temporary prefix when writing files, after write it will 305 * be renamed to the correct name. 306 */ 307 public void setTempPrefix(String tempPrefix) { 308 this.tempPrefix = tempPrefix; 309 } 310 311 public GenericFileConfiguration getConfiguration() { 312 if (configuration == null) { 313 configuration = new GenericFileConfiguration(); 314 } 315 return configuration; 316 } 317 318 public void setConfiguration(GenericFileConfiguration configuration) { 319 this.configuration = configuration; 320 } 321 322 public GenericFileExclusiveReadLockStrategy getExclusiveReadLockStrategy() { 323 return exclusiveReadLockStrategy; 324 } 325 326 public void setExclusiveReadLockStrategy(GenericFileExclusiveReadLockStrategy exclusiveReadLockStrategy) { 327 this.exclusiveReadLockStrategy = exclusiveReadLockStrategy; 328 } 329 330 public String getReadLock() { 331 return readLock; 332 } 333 334 public void setReadLock(String readLock) { 335 this.readLock = readLock; 336 } 337 338 public long getReadLockTimeout() { 339 return readLockTimeout; 340 } 341 342 public void setReadLockTimeout(long readLockTimeout) { 343 this.readLockTimeout = readLockTimeout; 344 } 345 346 public int getBufferSize() { 347 return bufferSize; 348 } 349 350 public void setBufferSize(int bufferSize) { 351 this.bufferSize = bufferSize; 352 } 353 354 public GenericFileExist getFileExist() { 355 return fileExist; 356 } 357 358 public void setFileExist(GenericFileExist fileExist) { 359 this.fileExist = fileExist; 360 } 361 362 public boolean isAutoCreate() { 363 return autoCreate; 364 } 365 366 public void setAutoCreate(boolean autoCreate) { 367 this.autoCreate = autoCreate; 368 } 369 370 public GenericFileProcessStrategy<T> getProcessStrategy() { 371 return processStrategy; 372 } 373 374 public void setProcessStrategy(GenericFileProcessStrategy<T> processStrategy) { 375 this.processStrategy = processStrategy; 376 } 377 378 public String getLocalWorkDirectory() { 379 return localWorkDirectory; 380 } 381 382 public void setLocalWorkDirectory(String localWorkDirectory) { 383 this.localWorkDirectory = localWorkDirectory; 384 } 385 386 public int getMaxMessagesPerPoll() { 387 return maxMessagesPerPoll; 388 } 389 390 public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { 391 this.maxMessagesPerPoll = maxMessagesPerPoll; 392 } 393 394 public IdempotentRepository<String> getInProgressRepository() { 395 return inProgressRepository; 396 } 397 398 public void setInProgressRepository(IdempotentRepository<String> inProgressRepository) { 399 this.inProgressRepository = inProgressRepository; 400 } 401 402 /** 403 * Configures the given message with the file which sets the body to the 404 * file object. 405 */ 406 public void configureMessage(GenericFile<T> file, Message message) { 407 message.setBody(file); 408 409 if (flatten) { 410 // when flatten the file name should not contain any paths 411 message.setHeader(Exchange.FILE_NAME, file.getFileNameOnly()); 412 } else { 413 // compute name to set on header that should be relative to starting directory 414 String name = file.isAbsolute() ? file.getAbsoluteFilePath() : file.getRelativeFilePath(); 415 416 // skip leading endpoint configured directory 417 String endpointPath = getConfiguration().getDirectory(); 418 if (ObjectHelper.isNotEmpty(endpointPath) && name.startsWith(endpointPath)) { 419 name = ObjectHelper.after(name, getConfiguration().getDirectory() + File.separator); 420 } 421 422 // adjust filename 423 message.setHeader(Exchange.FILE_NAME, name); 424 } 425 } 426 427 /** 428 * Strategy to configure the move or premove option based on a String input. 429 * <p/> 430 * @param expression the original string input 431 * @return configured string or the original if no modifications is needed 432 */ 433 protected String configureMoveOrPreMoveExpression(String expression) { 434 // if the expression already have ${ } placeholders then pass it unmodified 435 if (expression.indexOf("${") != -1) { 436 return expression; 437 } 438 439 // remove trailing slash 440 expression = FileUtil.stripTrailingSeparator(expression); 441 442 StringBuilder sb = new StringBuilder(); 443 444 // if relative then insert start with the parent folder 445 if (!isAbsolute(expression)) { 446 sb.append("${file:parent}"); 447 sb.append(getFileSeparator()); 448 } 449 // insert the directory the end user provided 450 sb.append(expression); 451 // append only the filename (file:name can contain a relative path, so we must use onlyname) 452 sb.append(getFileSeparator()); 453 sb.append("${file:onlyname}"); 454 455 return sb.toString(); 456 } 457 458 protected Map<String, Object> getParamsAsMap() { 459 Map<String, Object> params = new HashMap<String, Object>(); 460 461 if (isNoop()) { 462 params.put("noop", Boolean.toString(true)); 463 } 464 if (isDelete()) { 465 params.put("delete", Boolean.toString(true)); 466 } 467 if (move != null) { 468 params.put("move", move); 469 } 470 if (preMove != null) { 471 params.put("preMove", preMove); 472 } 473 if (exclusiveReadLockStrategy != null) { 474 params.put("exclusiveReadLockStrategy", exclusiveReadLockStrategy); 475 } 476 if (readLock != null) { 477 params.put("readLock", readLock); 478 } 479 if (readLockTimeout > 0) { 480 params.put("readLockTimeout", readLockTimeout); 481 } 482 483 return params; 484 } 485 486 private Expression createFileLangugeExpression(String expression) { 487 Language language = getCamelContext().resolveLanguage("file"); 488 return language.createExpression(expression); 489 } 490 }