001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.lang.reflect.Method; 022 import java.util.Comparator; 023 import java.util.HashMap; 024 import java.util.Map; 025 026 import org.apache.camel.CamelContext; 027 import org.apache.camel.Component; 028 import org.apache.camel.Exchange; 029 import org.apache.camel.Expression; 030 import org.apache.camel.Message; 031 import org.apache.camel.Processor; 032 import org.apache.camel.impl.ScheduledPollEndpoint; 033 import org.apache.camel.processor.idempotent.MemoryIdempotentRepository; 034 import org.apache.camel.spi.FactoryFinder; 035 import org.apache.camel.spi.IdempotentRepository; 036 import org.apache.camel.spi.Language; 037 import org.apache.camel.util.FileUtil; 038 import org.apache.camel.util.ObjectHelper; 039 import org.apache.camel.util.UuidGenerator; 040 import org.apache.commons.logging.Log; 041 import org.apache.commons.logging.LogFactory; 042 043 /** 044 * Generic FileEndpoint 045 */ 046 public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint { 047 048 protected static final transient String DEFAULT_STRATEGYFACTORY_CLASS = "org.apache.camel.component.file.strategy.GenericFileProcessStrategyFactory"; 049 protected static final transient int DEFAULT_IDEMPOTENT_CACHE_SIZE = 1000; 050 051 protected final transient Log log = LogFactory.getLog(getClass()); 052 053 protected GenericFileProcessStrategy<T> processStrategy; 054 protected GenericFileConfiguration configuration; 055 056 protected IdempotentRepository<String> inProgressRepository = new MemoryIdempotentRepository(); 057 protected String localWorkDirectory; 058 protected boolean autoCreate = true; 059 protected int bufferSize = 128 * 1024; 060 protected GenericFileExist fileExist = GenericFileExist.Override; 061 protected boolean noop; 062 protected boolean recursive; 063 protected boolean delete; 064 protected boolean flatten; 065 protected int maxMessagesPerPoll; 066 protected String tempPrefix; 067 protected String include; 068 protected String exclude; 069 protected Expression fileName; 070 protected Expression move; 071 protected Expression preMove; 072 protected boolean idempotent; 073 protected IdempotentRepository idempotentRepository; 074 protected GenericFileFilter<T> filter; 075 protected Comparator<GenericFile<T>> sorter; 076 protected Comparator<GenericFileExchange> sortBy; 077 protected String readLock = "none"; 078 protected long readLockTimeout; 079 protected GenericFileExclusiveReadLockStrategy exclusiveReadLockStrategy; 080 081 public GenericFileEndpoint() { 082 } 083 084 public GenericFileEndpoint(String endpointUri, Component component) { 085 super(endpointUri, component); 086 } 087 088 public boolean isSingleton() { 089 return true; 090 } 091 092 public abstract GenericFileConsumer<T> createConsumer(Processor processor) throws Exception; 093 094 public abstract GenericFileProducer<T> createProducer() throws Exception; 095 096 public abstract GenericFileExchange<T> createExchange(GenericFile<T> file); 097 098 public abstract String getScheme(); 099 100 public abstract char getFileSeparator(); 101 102 public abstract boolean isAbsolute(String name); 103 104 /** 105 * Return the file name that will be auto-generated for the given message if 106 * none is provided 107 */ 108 public String getGeneratedFileName(Message message) { 109 return UuidGenerator.generateSanitizedId(message.getMessageId()); 110 } 111 112 public GenericFileProcessStrategy<T> getGenericFileProcessStrategy() { 113 if (processStrategy == null) { 114 processStrategy = createGenericFileStrategy(); 115 if (log.isDebugEnabled()) { 116 log.debug("Using Generic file process strategy: " + processStrategy); 117 } 118 } 119 return processStrategy; 120 } 121 122 /** 123 * A strategy method to lazily create the file strategy 124 */ 125 @SuppressWarnings("unchecked") 126 protected GenericFileProcessStrategy<T> createGenericFileStrategy() { 127 Class<?> factory = null; 128 try { 129 FactoryFinder finder = getCamelContext().getFactoryFinder("META-INF/services/org/apache/camel/component/"); 130 factory = finder.findClass(getScheme(), "strategy.factory."); 131 } catch (ClassNotFoundException e) { 132 log.debug("'strategy.factory.class' not found", e); 133 } catch (IOException e) { 134 log.debug("No strategy factory defined in 'META-INF/services/org/apache/camel/component/'", e); 135 } 136 137 if (factory == null) { 138 // use default 139 factory = this.getCamelContext().getClassResolver().resolveClass(DEFAULT_STRATEGYFACTORY_CLASS); 140 if (factory == null) { 141 throw new TypeNotPresentException(DEFAULT_STRATEGYFACTORY_CLASS + " class not found", null); 142 } 143 } 144 145 try { 146 Method factoryMethod = factory.getMethod("createGenericFileProcessStrategy", CamelContext.class, Map.class); 147 return (GenericFileProcessStrategy<T>) ObjectHelper.invokeMethod(factoryMethod, null, getCamelContext(), getParamsAsMap()); 148 } catch (NoSuchMethodException e) { 149 throw new TypeNotPresentException(factory.getSimpleName() + ".createGenericFileProcessStrategy method not found", e); 150 } 151 } 152 153 public void setGenericFileProcessStrategy(GenericFileProcessStrategy<T> genericFileProcessStrategy) { 154 this.processStrategy = genericFileProcessStrategy; 155 } 156 157 public boolean isNoop() { 158 return noop; 159 } 160 161 public void setNoop(boolean noop) { 162 this.noop = noop; 163 } 164 165 public boolean isRecursive() { 166 return recursive; 167 } 168 169 public void setRecursive(boolean recursive) { 170 this.recursive = recursive; 171 } 172 173 public String getInclude() { 174 return include; 175 } 176 177 public void setInclude(String include) { 178 this.include = include; 179 } 180 181 public String getExclude() { 182 return exclude; 183 } 184 185 public void setExclude(String exclude) { 186 this.exclude = exclude; 187 } 188 189 public boolean isDelete() { 190 return delete; 191 } 192 193 public void setDelete(boolean delete) { 194 this.delete = delete; 195 } 196 197 public boolean isFlatten() { 198 return flatten; 199 } 200 201 public void setFlatten(boolean flatten) { 202 this.flatten = flatten; 203 } 204 205 public Expression getMove() { 206 return move; 207 } 208 209 public void setMove(Expression move) { 210 this.move = move; 211 } 212 213 /** 214 * Sets the move expression based on 215 * {@link org.apache.camel.language.simple.FileLanguage} 216 */ 217 public void setMove(String fileLanguageExpression) { 218 String expression = configureMoveOrPreMoveExpression(fileLanguageExpression); 219 this.move = createFileLangugeExpression(expression); 220 } 221 222 public Expression getPreMove() { 223 return preMove; 224 } 225 226 public void setPreMove(Expression preMove) { 227 this.preMove = preMove; 228 } 229 230 /** 231 * Sets the pre move expression based on 232 * {@link org.apache.camel.language.simple.FileLanguage} 233 */ 234 public void setPreMove(String fileLanguageExpression) { 235 String expression = configureMoveOrPreMoveExpression(fileLanguageExpression); 236 this.preMove = createFileLangugeExpression(expression); 237 } 238 239 public Expression getFileName() { 240 return fileName; 241 } 242 243 public void setFileName(Expression fileName) { 244 this.fileName = fileName; 245 } 246 247 /** 248 * Sets the file expression based on 249 * {@link org.apache.camel.language.simple.FileLanguage} 250 */ 251 public void setFileName(String fileLanguageExpression) { 252 this.fileName = createFileLangugeExpression(fileLanguageExpression); 253 } 254 255 public boolean isIdempotent() { 256 return idempotent; 257 } 258 259 public void setIdempotent(boolean idempotent) { 260 this.idempotent = idempotent; 261 } 262 263 public IdempotentRepository getIdempotentRepository() { 264 return idempotentRepository; 265 } 266 267 public void setIdempotentRepository(IdempotentRepository idempotentRepository) { 268 this.idempotentRepository = idempotentRepository; 269 } 270 271 public GenericFileFilter<T> getFilter() { 272 return filter; 273 } 274 275 public void setFilter(GenericFileFilter<T> filter) { 276 this.filter = filter; 277 } 278 279 public Comparator<GenericFile<T>> getSorter() { 280 return sorter; 281 } 282 283 public void setSorter(Comparator<GenericFile<T>> sorter) { 284 this.sorter = sorter; 285 } 286 287 public Comparator<GenericFileExchange> getSortBy() { 288 return sortBy; 289 } 290 291 public void setSortBy(Comparator<GenericFileExchange> sortBy) { 292 this.sortBy = sortBy; 293 } 294 295 public void setSortBy(String expression) { 296 setSortBy(expression, false); 297 } 298 299 public void setSortBy(String expression, boolean reverse) { 300 setSortBy(GenericFileDefaultSorter.sortByFileLanguage(getCamelContext(), expression, reverse)); 301 } 302 303 public String getTempPrefix() { 304 return tempPrefix; 305 } 306 307 /** 308 * Enables and uses temporary prefix when writing files, after write it will 309 * be renamed to the correct name. 310 */ 311 public void setTempPrefix(String tempPrefix) { 312 this.tempPrefix = tempPrefix; 313 } 314 315 public GenericFileConfiguration getConfiguration() { 316 if (configuration == null) { 317 configuration = new GenericFileConfiguration(); 318 } 319 return configuration; 320 } 321 322 public void setConfiguration(GenericFileConfiguration configuration) { 323 this.configuration = configuration; 324 } 325 326 public GenericFileExclusiveReadLockStrategy getExclusiveReadLockStrategy() { 327 return exclusiveReadLockStrategy; 328 } 329 330 public void setExclusiveReadLockStrategy(GenericFileExclusiveReadLockStrategy exclusiveReadLockStrategy) { 331 this.exclusiveReadLockStrategy = exclusiveReadLockStrategy; 332 } 333 334 public String getReadLock() { 335 return readLock; 336 } 337 338 public void setReadLock(String readLock) { 339 this.readLock = readLock; 340 } 341 342 public long getReadLockTimeout() { 343 return readLockTimeout; 344 } 345 346 public void setReadLockTimeout(long readLockTimeout) { 347 this.readLockTimeout = readLockTimeout; 348 } 349 350 public int getBufferSize() { 351 return bufferSize; 352 } 353 354 public void setBufferSize(int bufferSize) { 355 this.bufferSize = bufferSize; 356 } 357 358 public GenericFileExist getFileExist() { 359 return fileExist; 360 } 361 362 public void setFileExist(GenericFileExist fileExist) { 363 this.fileExist = fileExist; 364 } 365 366 public boolean isAutoCreate() { 367 return autoCreate; 368 } 369 370 public void setAutoCreate(boolean autoCreate) { 371 this.autoCreate = autoCreate; 372 } 373 374 public GenericFileProcessStrategy<T> getProcessStrategy() { 375 return processStrategy; 376 } 377 378 public void setProcessStrategy(GenericFileProcessStrategy<T> processStrategy) { 379 this.processStrategy = processStrategy; 380 } 381 382 public String getLocalWorkDirectory() { 383 return localWorkDirectory; 384 } 385 386 public void setLocalWorkDirectory(String localWorkDirectory) { 387 this.localWorkDirectory = localWorkDirectory; 388 } 389 390 public int getMaxMessagesPerPoll() { 391 return maxMessagesPerPoll; 392 } 393 394 public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { 395 this.maxMessagesPerPoll = maxMessagesPerPoll; 396 } 397 398 public IdempotentRepository<String> getInProgressRepository() { 399 return inProgressRepository; 400 } 401 402 public void setInProgressRepository(IdempotentRepository<String> inProgressRepository) { 403 this.inProgressRepository = inProgressRepository; 404 } 405 406 /** 407 * Configures the given message with the file which sets the body to the 408 * file object. 409 */ 410 public void configureMessage(GenericFile<T> file, Message message) { 411 message.setBody(file); 412 413 if (flatten) { 414 // when flatten the file name should not contain any paths 415 message.setHeader(Exchange.FILE_NAME, file.getFileNameOnly()); 416 } else { 417 // compute name to set on header that should be relative to starting directory 418 String name = file.isAbsolute() ? file.getAbsoluteFilePath() : file.getRelativeFilePath(); 419 420 // skip leading endpoint configured directory 421 String endpointPath = getConfiguration().getDirectory(); 422 if (ObjectHelper.isNotEmpty(endpointPath) && name.startsWith(endpointPath)) { 423 name = ObjectHelper.after(name, getConfiguration().getDirectory() + File.separator); 424 } 425 426 // adjust filename 427 message.setHeader(Exchange.FILE_NAME, name); 428 } 429 } 430 431 /** 432 * Strategy to configure the move or premove option based on a String input. 433 * <p/> 434 * @param expression the original string input 435 * @return configured string or the original if no modifications is needed 436 */ 437 protected String configureMoveOrPreMoveExpression(String expression) { 438 // if the expression already have ${ } placeholders then pass it unmodified 439 if (expression.indexOf("${") != -1) { 440 return expression; 441 } 442 443 // remove trailing slash 444 expression = FileUtil.stripTrailingSeparator(expression); 445 446 StringBuilder sb = new StringBuilder(); 447 448 // if relative then insert start with the parent folder 449 if (!isAbsolute(expression)) { 450 sb.append("${file:parent}"); 451 sb.append(getFileSeparator()); 452 } 453 // insert the directory the end user provided 454 sb.append(expression); 455 // append only the filename (file:name can contain a relative path, so we must use onlyname) 456 sb.append(getFileSeparator()); 457 sb.append("${file:onlyname}"); 458 459 return sb.toString(); 460 } 461 462 protected Map<String, Object> getParamsAsMap() { 463 Map<String, Object> params = new HashMap<String, Object>(); 464 465 if (isNoop()) { 466 params.put("noop", Boolean.toString(true)); 467 } 468 if (isDelete()) { 469 params.put("delete", Boolean.toString(true)); 470 } 471 if (move != null) { 472 params.put("move", move); 473 } 474 if (preMove != null) { 475 params.put("preMove", preMove); 476 } 477 if (exclusiveReadLockStrategy != null) { 478 params.put("exclusiveReadLockStrategy", exclusiveReadLockStrategy); 479 } 480 if (readLock != null) { 481 params.put("readLock", readLock); 482 } 483 if (readLockTimeout > 0) { 484 params.put("readLockTimeout", readLockTimeout); 485 } 486 487 return params; 488 } 489 490 private Expression createFileLangugeExpression(String expression) { 491 Language language = getCamelContext().resolveLanguage("file"); 492 return language.createExpression(expression); 493 } 494 495 }