001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file; 018 019 import org.apache.camel.Exchange; 020 import org.apache.camel.impl.LoggingExceptionHandler; 021 import org.apache.camel.spi.ExceptionHandler; 022 import org.apache.camel.spi.Synchronization; 023 import org.apache.commons.logging.Log; 024 import org.apache.commons.logging.LogFactory; 025 026 /** 027 * On completion strategy that performs the nessasary work after the {@link Exchange} has been processed. 028 * <p/> 029 * The work is for example to move the processed file into a backup folder, delete the file or 030 * in case of processing failure do a rollback. 031 * 032 * @version $Revision: 782544 $ 033 */ 034 public class GenericFileOnCompletion<T> implements Synchronization { 035 036 private final transient Log log = LogFactory.getLog(GenericFileOnCompletion.class); 037 private GenericFileEndpoint<T> endpoint; 038 private GenericFileOperations<T> operations; 039 private ExceptionHandler exceptionHandler; 040 041 public GenericFileOnCompletion(GenericFileEndpoint<T> endpoint, GenericFileOperations<T> operations) { 042 this.endpoint = endpoint; 043 this.operations = operations; 044 } 045 046 @SuppressWarnings("unchecked") 047 public void onComplete(Exchange exchange) { 048 onCompletion((GenericFileExchange<T>) exchange); 049 } 050 051 @SuppressWarnings("unchecked") 052 public void onFailure(Exchange exchange) { 053 onCompletion((GenericFileExchange<T>) exchange); 054 } 055 056 public ExceptionHandler getExceptionHandler() { 057 if (exceptionHandler == null) { 058 exceptionHandler = new LoggingExceptionHandler(getClass()); 059 } 060 return exceptionHandler; 061 } 062 063 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 064 this.exceptionHandler = exceptionHandler; 065 } 066 067 protected void onCompletion(GenericFileExchange<T> exchange) { 068 GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy(); 069 070 // after processing 071 final GenericFile<T> file = exchange.getGenericFile(); 072 boolean failed = exchange.isFailed(); 073 074 if (log.isDebugEnabled()) { 075 log.debug("Done processing file: " + file + " using exchange: " + exchange); 076 } 077 078 // commit or rollback 079 boolean committed = false; 080 try { 081 if (!failed) { 082 // commit the file strategy if there was no failure or already handled by the DeadLetterChannel 083 processStrategyCommit(processStrategy, exchange, file); 084 committed = true; 085 } else { 086 if (exchange.getException() != null) { 087 // if the failure was an exception then handle it 088 handleException(exchange.getException()); 089 } 090 } 091 } finally { 092 if (!committed) { 093 processStrategyRollback(processStrategy, exchange, file); 094 } 095 096 // remove file from the in progress list as its no longer in progress 097 endpoint.getInProgressRepository().remove(file.getFileName()); 098 } 099 } 100 101 /** 102 * Strategy when the file was processed and a commit should be executed. 103 * 104 * @param processStrategy the strategy to perform the commit 105 * @param exchange the exchange 106 * @param file the file processed 107 */ 108 @SuppressWarnings("unchecked") 109 protected void processStrategyCommit(GenericFileProcessStrategy<T> processStrategy, 110 GenericFileExchange<T> exchange, GenericFile<T> file) { 111 if (endpoint.isIdempotent()) { 112 // only add to idempotent repository if we could process the file 113 // only use the filename as the key as the file could be moved into a done folder 114 endpoint.getIdempotentRepository().add(file.getFileName()); 115 } 116 117 try { 118 if (log.isTraceEnabled()) { 119 log.trace("Committing remote file strategy: " + processStrategy + " for file: " + file); 120 } 121 processStrategy.commit(operations, endpoint, exchange, file); 122 } catch (Exception e) { 123 handleException(e); 124 } 125 } 126 127 /** 128 * Strategy when the file was not processed and a rollback should be executed. 129 * 130 * @param processStrategy the strategy to perform the commit 131 * @param exchange the exchange 132 * @param file the file processed 133 */ 134 protected void processStrategyRollback(GenericFileProcessStrategy<T> processStrategy, 135 GenericFileExchange<T> exchange, GenericFile<T> file) { 136 if (log.isWarnEnabled()) { 137 log.warn("Rolling back remote file strategy: " + processStrategy + " for file: " + file); 138 } 139 try { 140 processStrategy.rollback(operations, endpoint, exchange, file); 141 } catch (Exception e) { 142 handleException(e); 143 } 144 } 145 146 protected void handleException(Throwable t) { 147 Throwable newt = (t == null) ? new IllegalArgumentException("Handling [null] exception") : t; 148 getExceptionHandler().handleException(newt); 149 } 150 151 @Override 152 public String toString() { 153 return "GenericFileOnCompletion"; 154 } 155 }