001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.idempotent;
018    
019    import org.apache.camel.Exchange;
020    import org.apache.camel.spi.IdempotentRepository;
021    import org.apache.camel.spi.Synchronization;
022    import org.apache.commons.logging.Log;
023    import org.apache.commons.logging.LogFactory;
024    
025    /**
026     * On completion strategy for {@link org.apache.camel.processor.idempotent.IdempotentConsumer}.
027     * <p/>
028     * This strategy adds the message id to the idempotent repository in cast the exchange
029     * was processed successfully. In case of failure the message id is <b>not</b> added.
030     *
031     * @version $Revision: 782534 $
032     */
033    public class IdempotentOnCompletion implements Synchronization {
034        private static final transient Log LOG = LogFactory.getLog(IdempotentOnCompletion.class);
035        private final IdempotentRepository idempotentRepository;
036        private final String messageId;
037        private final boolean eager;
038    
039        public IdempotentOnCompletion(IdempotentRepository idempotentRepository, String messageId, boolean eager) {
040            this.idempotentRepository = idempotentRepository;
041            this.messageId = messageId;
042            this.eager = eager;
043        }
044    
045        public void onComplete(Exchange exchange) {
046            onCompletedMessage(exchange, messageId);
047        }
048    
049        public void onFailure(Exchange exchange) {
050            onFailedMessage(exchange, messageId);
051        }
052    
053        /**
054         * A strategy method to allow derived classes to overload the behaviour of
055         * processing a completed message
056         *
057         * @param exchange the exchange
058         * @param messageId the message ID of this exchange
059         */
060        @SuppressWarnings("unchecked")
061        protected void onCompletedMessage(Exchange exchange, String messageId) {
062            if (!eager) {
063                // if not eager we should add the key when its complete
064                idempotentRepository.add(messageId);
065            }
066            idempotentRepository.confirm(messageId);
067        }
068    
069        /**
070         * A strategy method to allow derived classes to overload the behaviour of
071         * processing a failed message
072         *
073         * @param exchange the exchange
074         * @param messageId the message ID of this exchange
075         */
076        @SuppressWarnings("unchecked")
077        protected void onFailedMessage(Exchange exchange, String messageId) {
078            idempotentRepository.remove(messageId);
079            if (LOG.isDebugEnabled()) {
080                LOG.debug("Removed from repository as exchange failed: " + exchange + " with id: " + messageId);
081            }
082        }
083    
084        @Override
085        public String toString() {
086            return "IdempotentOnCompletion[" + messageId + ']';
087        }
088    
089    }