001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file.strategy;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.io.RandomAccessFile;
022    import java.nio.channels.Channel;
023    import java.nio.channels.FileChannel;
024    import java.nio.channels.FileLock;
025    
026    import org.apache.camel.Exchange;
027    import org.apache.camel.component.file.GenericFile;
028    import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
029    import org.apache.camel.component.file.GenericFileOperations;
030    import org.apache.camel.util.ExchangeHelper;
031    import org.apache.camel.util.ObjectHelper;
032    import org.apache.commons.logging.Log;
033    import org.apache.commons.logging.LogFactory;
034    
035    /**
036     * Acquires exclusive read lock to the given file. Will wait until the lock is granted.
037     * After granting the read lock it is released, we just want to make sure that when we start
038     * consuming the file its not currently in progress of being written by third party.
039     */
040    public class FileLockExclusiveReadLockStrategy implements GenericFileExclusiveReadLockStrategy<File> {
041        private static final transient Log LOG = LogFactory.getLog(FileLockExclusiveReadLockStrategy.class);
042        private long timeout;
043    
044        public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
045            File target = new File(file.getAbsoluteFilePath());
046    
047            if (LOG.isTraceEnabled()) {
048                LOG.trace("Waiting for exclusive read lock to file: " + target);
049            }
050    
051            try {
052                // try to acquire rw lock on the file before we can consume it
053                FileChannel channel = new RandomAccessFile(target, "rw").getChannel();
054    
055                long start = System.currentTimeMillis();
056                boolean exclusive = false;
057    
058                while (!exclusive) {
059                    // timeout check
060                    if (timeout > 0) {
061                        long delta = System.currentTimeMillis() - start;
062                        if (delta > timeout) {
063                            LOG.debug("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + target);
064                            // we could not get the lock within the timeout period, so return false
065                            return false;
066                        }
067                    }
068    
069                    // get the lock using either try lock or not depending on if we are using timeout or not
070                    FileLock lock = null;
071                    try {
072                        lock = timeout > 0 ? channel.tryLock() : channel.lock();
073                    } catch (IllegalStateException ex) {
074                        // Also catch the OverlappingFileLockException here. Do nothing here
075                    }
076                    if (lock != null) {
077                        if (LOG.isTraceEnabled()) {
078                            LOG.trace("Acquired exclusive read lock: " + lock + " to file: " + target);
079                        }
080    
081                        // store lock so we can release it later
082                        exchange.setProperty("CamelFileLock", lock);
083                        exchange.setProperty("CamelFileLockName", target.getName());
084    
085                        exclusive = true;
086                    } else {
087                        boolean interrupted = sleep();
088                        if (interrupted) {
089                            // we were interrputed while sleeping, we are likely being shutdown so return false
090                            return false;
091                        }
092                    }
093                }
094            } catch (IOException e) {
095                // must handle IOException as some apps on Windows etc. will still somehow hold a lock to a file
096                // such as AntiVirus or MS Office that has special locks for it's supported files
097                if (timeout == 0) {
098                    // if not using timeout, then we cant retry, so rethrow
099                    throw e;
100                }
101                if (LOG.isDebugEnabled()) {
102                    LOG.debug("Cannot acquire read lock. Will try again.", e);
103                }
104                boolean interrupted = sleep();
105                if (interrupted) {
106                    // we were interrputed while sleeping, we are likely being shutdown so return false
107                    return false;
108                }
109            }
110    
111            return true;
112        }
113    
114        public void releaseExclusiveReadLock(GenericFileOperations<File> fileGenericFileOperations,
115                                             GenericFile<File> fileGenericFile, Exchange exchange) throws Exception {
116            FileLock lock = ExchangeHelper.getMandatoryProperty(exchange, "CamelFileLock", FileLock.class);
117            String lockFileName = ExchangeHelper.getMandatoryProperty(exchange, "CamelFileLockName", String.class);
118            Channel channel = lock.channel();
119            try {
120                lock.release();
121            } finally {
122                // must close channel first
123                ObjectHelper.close(channel, "while acquiring exclusive read lock for file: " + lockFileName, LOG);
124                // need to delete the lock file
125                File lockfile = new File(lockFileName);
126                boolean deleted = lockfile.delete();
127                if (LOG.isTraceEnabled()) {
128                    LOG.trace("Exclusive lock file: " + lockFileName + " was deleted: " + deleted);
129                }
130            }
131        }
132    
133        private boolean sleep() {
134            LOG.trace("Exclusive read lock not granted. Sleeping for 1000 millis.");
135            try {
136                Thread.sleep(1000);
137                return true;
138            } catch (InterruptedException e) {
139                LOG.debug("Sleep interrupted while waiting for exclusive read lock, so breaking out");
140                return false;
141            }
142        }
143    
144        public long getTimeout() {
145            return timeout;
146        }
147    
148        /**
149         * Sets an optional timeout period.
150         * <p/>
151         * If the readlock could not be granted within the timeperiod then the wait is stopped and the
152         * acquireReadLock returns <tt>false</tt>.
153         *
154         * @param timeout period in millis
155         */
156        public void setTimeout(long timeout) {
157            this.timeout = timeout;
158        }
159    
160    }